Python 비동기 처리 도입기: asyncio와 aiohttp로 API 호출 성능 개선
문제 상황
매일 새벽에 실행되는 데이터 수집 배치가 있었다. 약 5000개의 아이템에 대해 외부 API를 호출해 정보를 가져오는 작업인데, 실행 시간이 40분을 넘어가면서 다음 작업 스케줄과 겹치는 문제가 발생했다.
기존 코드는 requests를 사용한 단순한 for loop 구조였다.
import requests
def fetch_item_info(item_id):
response = requests.get(f'https://api.example.com/items/{item_id}')
return response.json()
for item_id in item_ids: # 5000개
data = fetch_item_info(item_id)
save_to_db(data)
각 API 호출이 평균 500ms가 걸렸고, 순차 처리라 전체 시간이 선형적으로 증가했다.
asyncio + aiohttp 도입
Python 3.7부터 asyncio가 안정화되면서 본격적으로 사용할 만해졌다. aiohttp를 사용해 비동기 HTTP 요청을 처리하도록 변경했다.
import asyncio
import aiohttp
async def fetch_item_info(session, item_id):
async with session.get(f'https://api.example.com/items/{item_id}') as response:
return await response.json()
async def process_batch(item_ids):
async with aiohttp.ClientSession() as session:
tasks = [fetch_item_info(session, item_id) for item_id in item_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
results = asyncio.run(process_batch(item_ids))
결과 및 개선사항
처리 시간이 40분에서 8분으로 단축되었다. 하지만 초기 테스트에서 몇 가지 문제가 있었다.
- 동시 연결 수 제한: 5000개 요청을 한 번에 보내니 서버에서 429 에러가 발생했다. Semaphore로 동시 요청을 50개로 제한했다.
semaphore = asyncio.Semaphore(50)
async def fetch_with_limit(session, item_id):
async with semaphore:
return await fetch_item_info(session, item_id)
-
에러 핸들링: gather의
return_exceptions=True옵션으로 일부 실패해도 전체 작업이 중단되지 않도록 했다. -
Connection Pool: ClientSession의 connector에 limit 설정으로 소켓 고갈 방지
connector = aiohttp.TCPConnector(limit=100)
async with aiohttp.ClientSession(connector=connector) as session:
# ...
마무리
비동기 처리가 만능은 아니지만, I/O bound 작업에서는 확실히 효과적이었다. 다만 디버깅이 동기 코드보다 까다롭고, 팀 내 asyncio 경험이 부족해 코드 리뷰에 시간이 더 걸렸다. 점진적으로 다른 배치 작업에도 적용할 예정이다.