Python asyncio로 외부 API 호출 최적화한 경험

문제 상황

매일 새벽 실행되는 배치 작업이 있었다. 약 500개의 항목에 대해 외부 API를 호출해 데이터를 동기화하는 작업인데, 순차 처리로 인해 10분 넘게 걸렸다.

for item in items:
    response = requests.get(f'{API_URL}/{item.id}')
    update_database(item.id, response.json())

asyncio 도입

aiohttp와 asyncio를 사용해 비동기로 전환했다.

import asyncio
import aiohttp

async def fetch_item(session, item):
    async with session.get(f'{API_URL}/{item.id}') as response:
        data = await response.json()
        return item.id, data

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_item(session, item) for item in items]
        results = await asyncio.gather(*tasks)
        
    for item_id, data in results:
        update_database(item_id, data)

마주친 문제들

1. DB 커넥션 이슈

기존 SQLAlchemy 세션을 그대로 사용했더니 thread-safety 오류가 발생했다. 각 코루틴마다 별도 세션을 생성하거나, 결과를 모아서 한 번에 처리하는 방식으로 해결했다.

2. Rate Limiting

동시에 500개 요청이 나가면서 API 서버에서 429 에러가 발생했다. asyncio.Semaphore로 동시 요청 수를 20개로 제한했다.

sem = asyncio.Semaphore(20)

async def fetch_item(session, item):
    async with sem:
        async with session.get(f'{API_URL}/{item.id}') as response:
            return await response.json()

결과

처리 시간이 10분에서 1분 30초로 단축되었다. 동시성 제어와 에러 핸들링만 잘 챙기면 asyncio는 I/O 바운드 작업에서 확실히 효과적이었다.