Python 멀티프로세싱으로 대용량 CSV 처리 개선

문제 상황

배치 작업에서 매일 300만 건 이상의 CSV 데이터를 파싱하고 가공해서 DB에 적재하는 작업이 있었다. 단일 프로세스로 순차 처리하니 약 40분이 걸렸고, 이게 병목이 되어 다음 작업들이 지연되는 문제가 발생했다.

해결 과정

Python의 multiprocessing 모듈을 사용해 CPU 코어를 최대한 활용하기로 했다. GIL 때문에 threading으로는 CPU-bound 작업 개선이 어렵다는 점을 고려했다.

from multiprocessing import Pool
import csv

def process_chunk(chunk):
    results = []
    for row in chunk:
        # 데이터 가공 로직
        processed = transform_row(row)
        results.append(processed)
    return results

def split_csv(filename, num_chunks):
    with open(filename, 'r') as f:
        reader = csv.reader(f)
        header = next(reader)
        rows = list(reader)
    
    chunk_size = len(rows) // num_chunks
    return [rows[i:i+chunk_size] for i in range(0, len(rows), chunk_size)]

if __name__ == '__main__':
    chunks = split_csv('data.csv', 8)
    
    with Pool(processes=8) as pool:
        results = pool.map(process_chunk, chunks)
    
    # 결과 병합 및 DB 적재
    flattened = [item for sublist in results for item in sublist]

결과

8코어 서버에서 처리 시간이 40분에서 10분으로 단축되었다. 다만 메모리 사용량이 증가해서 청크 크기 조절이 필요했다. 프로세스 간 통신 오버헤드를 고려해 적절한 청크 사이즈를 찾는 게 중요했다.

실제로는 DB 적재 부분이 새로운 병목이 되어, bulk insert를 추가로 적용해야 했다. 병렬 처리가 만능은 아니고 전체 파이프라인을 봐야 한다는 걸 다시 확인했다.