FastAPI 비동기 배치 처리 성능 개선

문제 상황

외부 파트너사 API에서 500개 이상의 상품 정보를 가져와야 하는 배치 작업이 있었다. 순차 처리 시 약 2분이 소요되어 사용자 경험이 좋지 않았고, 무분별한 병렬 처리는 외부 API의 rate limit에 걸렸다.

해결 방법

세마포어를 활용한 동시성 제어

import asyncio
from typing import List
import httpx

class ProductFetcher:
    def __init__(self, max_concurrent: int = 20):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.client = httpx.AsyncClient(timeout=10.0)
    
    async def fetch_product(self, product_id: str) -> dict:
        async with self.semaphore:
            response = await self.client.get(
                f"https://api.partner.com/products/{product_id}"
            )
            return response.json()
    
    async def fetch_batch(self, product_ids: List[str]) -> List[dict]:
        tasks = [self.fetch_product(pid) for pid in product_ids]
        return await asyncio.gather(*tasks, return_exceptions=True)

에러 핸들링과 재시도

일부 요청이 실패해도 전체가 중단되지 않도록 return_exceptions=True를 사용했다. 실패한 요청은 별도로 재시도 로직을 태웠다.

async def process_with_retry(self, product_ids: List[str]) -> dict:
    results = await self.fetch_batch(product_ids)
    
    failed_ids = []
    success_data = []
    
    for pid, result in zip(product_ids, results):
        if isinstance(result, Exception):
            failed_ids.append(pid)
        else:
            success_data.append(result)
    
    if failed_ids:
        await asyncio.sleep(1)
        retry_results = await self.fetch_batch(failed_ids)
        success_data.extend([r for r in retry_results if not isinstance(r, Exception)])
    
    return {"success": success_data, "failed_count": len(failed_ids)}

결과

  • 평균 응답 시간: 120초 → 35초
  • 동시 요청 수를 20개로 제한하여 rate limit 회피
  • 에러율 0.3% 미만 유지

세마포어 값은 파트너사 API 스펙과 여러 번의 테스트를 통해 조정했다. 처음엔 50으로 설정했다가 429 에러가 빈발해서 20으로 낮췄다.