Python 멀티프로세싱으로 대용량 CSV 처리 속도 개선
문제 상황
매일 밤 실행되는 데이터 집계 배치 작업이 있었다. 약 150만 건의 CSV를 읽어서 가공 후 DB에 저장하는 작업인데, 처리 시간이 2시간을 넘어가면서 다음날 업무 시작 전에 완료되지 않는 문제가 발생했다.
기존 코드
import csv
def process_row(row):
# 데이터 변환 로직
return transformed_data
with open('data.csv', 'r') as f:
reader = csv.DictReader(f)
for row in reader:
result = process_row(row)
save_to_db(result)
단순한 순차 처리였다. CPU 사용률을 확인해보니 25% 정도만 사용하고 있었다. 4코어 서버인데 1개만 쓰고 있던 것.
멀티프로세싱 적용
multiprocessing.Pool을 사용해서 작업을 병렬화했다.
from multiprocessing import Pool
import csv
def process_chunk(rows):
results = []
for row in rows:
results.append(process_row(row))
return results
def chunks(data, size):
for i in range(0, len(data), size):
yield data[i:i + size]
if __name__ == '__main__':
with open('data.csv', 'r') as f:
reader = csv.DictReader(f)
rows = list(reader)
with Pool(processes=4) as pool:
chunk_size = 10000
results = pool.map(process_chunk, chunks(rows, chunk_size))
# DB 저장은 메인 프로세스에서
for result_chunk in results:
bulk_insert_to_db(result_chunk)
결과 및 주의사항
- 처리 시간: 2시간 10분 → 35분으로 단축
- CPU 사용률 90% 이상으로 상승
주의할 점:
- 메모리 사용량이 증가한다. 전체 CSV를 메모리에 올리므로 파일 크기에 따라 조정 필요
- DB 연결은 각 프로세스마다 별도로 생성해야 함 (pickle 직렬화 문제)
if __name__ == '__main__'가드 필수
파일이 메모리에 다 안 올라가는 경우라면 multiprocessing.Queue를 사용한 producer-consumer 패턴을 고려해볼 만하다. 다음에 필요하면 그때 적용해보려고 한다.