Python 비동기 작업에서 asyncio.gather 예외 처리 패턴
문제 상황
데이터 파이프라인에서 10개 외부 API를 동시에 호출하는 작업이 있었다. 하나라도 실패하면 전체가 중단되는 문제가 발생했고, 성공한 데이터만이라도 처리해야 하는 요구사항이 추가되었다.
기존 코드
import asyncio
import aiohttp
async def fetch_data(session, url):
async with session.get(url) as response:
return await response.json()
async def main():
urls = [f"https://api.example.com/data/{i}" for i in range(10)]
async with aiohttp.ClientSession() as session:
tasks = [fetch_data(session, url) for url in urls]
results = await asyncio.gather(*tasks) # 하나 실패 시 전체 실패
이 코드는 한 API라도 에러를 던지면 전체 gather가 취소되었다.
해결 방법
return_exceptions=True 옵션을 사용하면 예외를 결과 리스트에 포함시킨다.
async def fetch_data_safe(session, url, index):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as response:
data = await response.json()
return {"index": index, "success": True, "data": data}
except Exception as e:
return {"index": index, "success": False, "error": str(e)}
async def main():
urls = [f"https://api.example.com/data/{i}" for i in range(10)]
async with aiohttp.ClientSession() as session:
tasks = [fetch_data_safe(session, url, i) for i, url in enumerate(urls)]
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if isinstance(r, dict) and r.get("success")]
failures = [r for r in results if isinstance(r, dict) and not r.get("success")]
print(f"성공: {len(successes)}, 실패: {len(failures)}")
return successes
추가 개선
타임아웃과 재시도 로직도 추가했다. semaphore로 동시 요청 수를 제한해 API 레이트 리미트도 관리할 수 있었다.
semaphore = asyncio.Semaphore(5) # 최대 5개 동시 요청
async def fetch_with_limit(session, url, index):
async with semaphore:
return await fetch_data_safe(session, url, index)
결과적으로 일부 API 장애 상황에서도 파이프라인이 안정적으로 동작하게 되었다.