Python 멀티프로세싱으로 CSV 대용량 처리 속도 개선

문제 상황

매일 새벽 실행되는 데이터 정제 배치 작업이 최근 처리량 증가로 4시간 이상 소요되기 시작했다. 200만 건의 CSV 파일을 읽어 각 행마다 유효성 검증, 데이터 변환, DB 적재를 순차 처리하는 구조였다.

기존 코드

import csv

def process_row(row):
    # 무거운 변환 로직
    validated = validate_data(row)
    transformed = transform_data(validated)
    save_to_db(transformed)

with open('data.csv', 'r') as f:
    reader = csv.DictReader(f)
    for row in reader:
        process_row(row)

단순 순차 처리라 CPU 코어를 하나만 사용하고 있었다.

해결 방법

multiprocessing.Pool을 사용해 CPU 코어 수만큼 프로세스를 생성했다. 파일을 청크 단위로 나눠 병렬 처리하도록 변경했다.

from multiprocessing import Pool, cpu_count
import csv

def process_chunk(rows):
    results = []
    for row in rows:
        validated = validate_data(row)
        transformed = transform_data(validated)
        results.append(transformed)
    return results

def chunk_csv(filename, chunk_size=10000):
    with open(filename, 'r') as f:
        reader = csv.DictReader(f)
        chunk = []
        for row in reader:
            chunk.append(row)
            if len(chunk) >= chunk_size:
                yield chunk
                chunk = []
        if chunk:
            yield chunk

if __name__ == '__main__':
    pool = Pool(cpu_count())
    chunks = list(chunk_csv('data.csv'))
    results = pool.map(process_chunk, chunks)
    pool.close()
    pool.join()
    
    # 결과 일괄 DB 저장
    for chunk_result in results:
        bulk_insert_db(chunk_result)

결과

  • 처리 시간: 4시간 20분 → 55분 (약 75% 단축)
  • 8코어 서버 기준 CPU 사용률 90% 이상 유지
  • DB 저장을 청크 단위 벌크 인서트로 변경해 추가 개선

주의사항

DB 커넥션은 각 프로세스마다 새로 생성해야 한다. 공유 메모리 이슈로 부모 프로세스의 커넥션을 그대로 쓰면 에러가 발생했다.