Python 비동기 처리에서 asyncio.gather()와 as_completed() 선택 기준
문제 상황
외부 API 1000개 엔드포인트를 호출하는 크롤러를 운영 중이었다. 기존에는 asyncio.gather()로 구현했는데, 일부 요청이 실패해도 전체가 멈추지 않도록 개선이 필요했다.
asyncio.gather()의 특징
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [f'https://api.example.com/data/{i}' for i in range(100)]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
return_exceptions=True 옵션을 사용하면 개별 실패가 전체를 멈추지 않는다. 하지만 모든 작업이 완료될 때까지 기다려야 하고, 결과가 입력 순서대로 반환된다.
as_completed()로 전환
완료되는 즉시 처리하고 싶어서 as_completed()로 변경했다.
async def main_streaming():
urls = [f'https://api.example.com/data/{i}' for i in range(100)]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
for coro in asyncio.as_completed(tasks):
try:
result = await coro
await process_result(result) # 즉시 처리
except Exception as e:
logger.error(f"Failed: {e}")
차이점 정리
asyncio.gather()
- 모든 작업 완료 후 일괄 반환
- 순서 보장
- 메모리에 모든 결과 적재
as_completed()
- 완료되는 대로 순차 처리
- 순서 보장 안 됨
- 메모리 효율적 (스트리밍)
우리 케이스는 결과를 DB에 즉시 저장하므로 순서가 중요하지 않았다. as_completed()로 변경 후 메모리 사용량이 40% 감소했고, 첫 결과 처리 시간이 3초에서 0.2초로 개선됐다.
결론
- 결과를 모아서 한 번에 처리:
gather() - 완료 즉시 처리하고 메모리 절약:
as_completed() - 둘 다
return_exceptions=True패턴으로 에러 핸들링 가능
실무에서는 데이터 파이프라인처럼 스트리밍이 필요한 경우가 많아서 as_completed()가 더 유용했다.