Python 멀티프로세싱으로 대용량 CSV 처리 개선
문제 상황
배치 작업에서 매일 300만 건 이상의 CSV 데이터를 파싱하고 가공해서 DB에 적재하는 작업이 있었다. 단일 프로세스로 순차 처리하니 약 40분이 걸렸고, 이게 병목이 되어 다음 작업들이 지연되는 문제가 발생했다.
해결 과정
Python의 multiprocessing 모듈을 사용해 CPU 코어를 최대한 활용하기로 했다. GIL 때문에 threading으로는 CPU-bound 작업 개선이 어렵다는 점을 고려했다.
from multiprocessing import Pool
import csv
def process_chunk(chunk):
results = []
for row in chunk:
# 데이터 가공 로직
processed = transform_row(row)
results.append(processed)
return results
def split_csv(filename, num_chunks):
with open(filename, 'r') as f:
reader = csv.reader(f)
header = next(reader)
rows = list(reader)
chunk_size = len(rows) // num_chunks
return [rows[i:i+chunk_size] for i in range(0, len(rows), chunk_size)]
if __name__ == '__main__':
chunks = split_csv('data.csv', 8)
with Pool(processes=8) as pool:
results = pool.map(process_chunk, chunks)
# 결과 병합 및 DB 적재
flattened = [item for sublist in results for item in sublist]
결과
8코어 서버에서 처리 시간이 40분에서 10분으로 단축되었다. 다만 메모리 사용량이 증가해서 청크 크기 조절이 필요했다. 프로세스 간 통신 오버헤드를 고려해 적절한 청크 사이즈를 찾는 게 중요했다.
실제로는 DB 적재 부분이 새로운 병목이 되어, bulk insert를 추가로 적용해야 했다. 병렬 처리가 만능은 아니고 전체 파이프라인을 봐야 한다는 걸 다시 확인했다.