Python asyncio 기반 배치 작업 성능 개선 기록

문제 상황

매일 밤 실행되는 데이터 수집 배치가 4시간 이상 소요되면서 점점 문제가 되었다. 약 50만 건의 레코드를 순회하며 외부 API를 호출하는 구조였는데, 동기 방식으로 구현되어 있어 I/O 대기 시간이 대부분이었다.

접근 방식

처음엔 멀티프로세싱을 고려했지만, 메모리 오버헤드와 공유 자원 관리가 복잡해질 것 같아 asyncio를 선택했다. 외부 API 호출이 bottleneck이었기 때문에 비동기 처리가 적합하다고 판단했다.

import asyncio
import aiohttp
from typing import List

async def fetch_data(session: aiohttp.ClientSession, item_id: str):
    async with session.get(f'https://api.example.com/items/{item_id}') as response:
        return await response.json()

async def process_batch(items: List[str], batch_size: int = 100):
    async with aiohttp.ClientSession() as session:
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            tasks = [fetch_data(session, item_id) for item_id in batch]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            # 결과 처리
            await save_results(results)

시행착오

처음엔 모든 요청을 한 번에 gather로 처리했다가 메모리 이슈와 rate limit에 걸렸다. 배치 단위로 나누고, asyncio.Semaphore로 동시 실행 수를 제한했다.

semaphore = asyncio.Semaphore(50)

async def fetch_with_limit(session, item_id):
    async with semaphore:
        return await fetch_data(session, item_id)

결과

  • 실행 시간: 4시간 → 30분 (약 8배 개선)
  • CPU 사용률: 거의 변화 없음
  • 메모리: 배치 크기 조정으로 안정적 유지

단, 에러 핸들링이 더 복잡해졌고 디버깅이 어려워진 점은 트레이드오프였다. return_exceptions=True로 부분 실패를 허용하고, 실패한 항목은 별도 재처리 큐에 넣는 방식으로 안정성을 확보했다.