FastAPI 비동기 배치 처리 성능 개선
문제 상황
외부 파트너사 API에서 500개 이상의 상품 정보를 가져와야 하는 배치 작업이 있었다. 순차 처리 시 약 2분이 소요되어 사용자 경험이 좋지 않았고, 무분별한 병렬 처리는 외부 API의 rate limit에 걸렸다.
해결 방법
세마포어를 활용한 동시성 제어
import asyncio
from typing import List
import httpx
class ProductFetcher:
def __init__(self, max_concurrent: int = 20):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.client = httpx.AsyncClient(timeout=10.0)
async def fetch_product(self, product_id: str) -> dict:
async with self.semaphore:
response = await self.client.get(
f"https://api.partner.com/products/{product_id}"
)
return response.json()
async def fetch_batch(self, product_ids: List[str]) -> List[dict]:
tasks = [self.fetch_product(pid) for pid in product_ids]
return await asyncio.gather(*tasks, return_exceptions=True)
에러 핸들링과 재시도
일부 요청이 실패해도 전체가 중단되지 않도록 return_exceptions=True를 사용했다. 실패한 요청은 별도로 재시도 로직을 태웠다.
async def process_with_retry(self, product_ids: List[str]) -> dict:
results = await self.fetch_batch(product_ids)
failed_ids = []
success_data = []
for pid, result in zip(product_ids, results):
if isinstance(result, Exception):
failed_ids.append(pid)
else:
success_data.append(result)
if failed_ids:
await asyncio.sleep(1)
retry_results = await self.fetch_batch(failed_ids)
success_data.extend([r for r in retry_results if not isinstance(r, Exception)])
return {"success": success_data, "failed_count": len(failed_ids)}
결과
- 평균 응답 시간: 120초 → 35초
- 동시 요청 수를 20개로 제한하여 rate limit 회피
- 에러율 0.3% 미만 유지
세마포어 값은 파트너사 API 스펙과 여러 번의 테스트를 통해 조정했다. 처음엔 50으로 설정했다가 429 에러가 빈발해서 20으로 낮췄다.