Python 멀티프로세싱으로 대용량 CSV 처리 속도 개선

문제 상황

매일 밤 실행되는 데이터 집계 배치 작업이 있었다. 약 150만 건의 CSV를 읽어서 가공 후 DB에 저장하는 작업인데, 처리 시간이 2시간을 넘어가면서 다음날 업무 시작 전에 완료되지 않는 문제가 발생했다.

기존 코드

import csv

def process_row(row):
    # 데이터 변환 로직
    return transformed_data

with open('data.csv', 'r') as f:
    reader = csv.DictReader(f)
    for row in reader:
        result = process_row(row)
        save_to_db(result)

단순한 순차 처리였다. CPU 사용률을 확인해보니 25% 정도만 사용하고 있었다. 4코어 서버인데 1개만 쓰고 있던 것.

멀티프로세싱 적용

multiprocessing.Pool을 사용해서 작업을 병렬화했다.

from multiprocessing import Pool
import csv

def process_chunk(rows):
    results = []
    for row in rows:
        results.append(process_row(row))
    return results

def chunks(data, size):
    for i in range(0, len(data), size):
        yield data[i:i + size]

if __name__ == '__main__':
    with open('data.csv', 'r') as f:
        reader = csv.DictReader(f)
        rows = list(reader)
    
    with Pool(processes=4) as pool:
        chunk_size = 10000
        results = pool.map(process_chunk, chunks(rows, chunk_size))
    
    # DB 저장은 메인 프로세스에서
    for result_chunk in results:
        bulk_insert_to_db(result_chunk)

결과 및 주의사항

  • 처리 시간: 2시간 10분 → 35분으로 단축
  • CPU 사용률 90% 이상으로 상승

주의할 점:

  1. 메모리 사용량이 증가한다. 전체 CSV를 메모리에 올리므로 파일 크기에 따라 조정 필요
  2. DB 연결은 각 프로세스마다 별도로 생성해야 함 (pickle 직렬화 문제)
  3. if __name__ == '__main__' 가드 필수

파일이 메모리에 다 안 올라가는 경우라면 multiprocessing.Queue를 사용한 producer-consumer 패턴을 고려해볼 만하다. 다음에 필요하면 그때 적용해보려고 한다.