Python 멀티프로세싱으로 CSV 대용량 처리 속도 개선
문제 상황
매일 새벽 실행되는 데이터 정제 배치 작업이 최근 처리량 증가로 4시간 이상 소요되기 시작했다. 200만 건의 CSV 파일을 읽어 각 행마다 유효성 검증, 데이터 변환, DB 적재를 순차 처리하는 구조였다.
기존 코드
import csv
def process_row(row):
# 무거운 변환 로직
validated = validate_data(row)
transformed = transform_data(validated)
save_to_db(transformed)
with open('data.csv', 'r') as f:
reader = csv.DictReader(f)
for row in reader:
process_row(row)
단순 순차 처리라 CPU 코어를 하나만 사용하고 있었다.
해결 방법
multiprocessing.Pool을 사용해 CPU 코어 수만큼 프로세스를 생성했다. 파일을 청크 단위로 나눠 병렬 처리하도록 변경했다.
from multiprocessing import Pool, cpu_count
import csv
def process_chunk(rows):
results = []
for row in rows:
validated = validate_data(row)
transformed = transform_data(validated)
results.append(transformed)
return results
def chunk_csv(filename, chunk_size=10000):
with open(filename, 'r') as f:
reader = csv.DictReader(f)
chunk = []
for row in reader:
chunk.append(row)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk
if __name__ == '__main__':
pool = Pool(cpu_count())
chunks = list(chunk_csv('data.csv'))
results = pool.map(process_chunk, chunks)
pool.close()
pool.join()
# 결과 일괄 DB 저장
for chunk_result in results:
bulk_insert_db(chunk_result)
결과
- 처리 시간: 4시간 20분 → 55분 (약 75% 단축)
- 8코어 서버 기준 CPU 사용률 90% 이상 유지
- DB 저장을 청크 단위 벌크 인서트로 변경해 추가 개선
주의사항
DB 커넥션은 각 프로세스마다 새로 생성해야 한다. 공유 메모리 이슈로 부모 프로세스의 커넥션을 그대로 쓰면 에러가 발생했다.