Python asyncio 기반 배치 작업 성능 개선 기록
문제 상황
매일 밤 실행되는 데이터 수집 배치가 4시간 이상 소요되면서 점점 문제가 되었다. 약 50만 건의 레코드를 순회하며 외부 API를 호출하는 구조였는데, 동기 방식으로 구현되어 있어 I/O 대기 시간이 대부분이었다.
접근 방식
처음엔 멀티프로세싱을 고려했지만, 메모리 오버헤드와 공유 자원 관리가 복잡해질 것 같아 asyncio를 선택했다. 외부 API 호출이 bottleneck이었기 때문에 비동기 처리가 적합하다고 판단했다.
import asyncio
import aiohttp
from typing import List
async def fetch_data(session: aiohttp.ClientSession, item_id: str):
async with session.get(f'https://api.example.com/items/{item_id}') as response:
return await response.json()
async def process_batch(items: List[str], batch_size: int = 100):
async with aiohttp.ClientSession() as session:
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
tasks = [fetch_data(session, item_id) for item_id in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 결과 처리
await save_results(results)
시행착오
처음엔 모든 요청을 한 번에 gather로 처리했다가 메모리 이슈와 rate limit에 걸렸다. 배치 단위로 나누고, asyncio.Semaphore로 동시 실행 수를 제한했다.
semaphore = asyncio.Semaphore(50)
async def fetch_with_limit(session, item_id):
async with semaphore:
return await fetch_data(session, item_id)
결과
- 실행 시간: 4시간 → 30분 (약 8배 개선)
- CPU 사용률: 거의 변화 없음
- 메모리: 배치 크기 조정으로 안정적 유지
단, 에러 핸들링이 더 복잡해졌고 디버깅이 어려워진 점은 트레이드오프였다. return_exceptions=True로 부분 실패를 허용하고, 실패한 항목은 별도 재처리 큐에 넣는 방식으로 안정성을 확보했다.