Python 비동기 처리에서 asyncio.gather 예외 처리 문제

문제 상황

사용자 데이터를 여러 외부 API에서 가져오는 배치 작업을 구현 중이었다. 100개 이상의 요청을 순차적으로 처리하면 너무 느려서 asyncio로 동시 처리하도록 변경했는데, 일부 요청이 실패하면 전체가 중단되는 문제가 발생했다.

import asyncio
import aiohttp

async def fetch_user_data(session, user_id):
    async with session.get(f'https://api.example.com/users/{user_id}') as resp:
        return await resp.json()

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_user_data(session, uid) for uid in range(100)]
        results = await asyncio.gather(*tasks)

이 코드는 하나의 요청이라도 예외를 발생시키면 gather가 즉시 예외를 raise하고 나머지 작업은 취소된다.

해결 방법

return_exceptions=True 파라미터를 추가하면 예외를 raise하지 않고 결과 리스트에 포함시킨다.

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_user_data(session, uid) for uid in range(100)]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f'User {i} failed: {result}')
            else:
                # 정상 처리
                process_user(result)

추가 고려사항

동시 요청 수를 제한하기 위해 Semaphore를 사용했다. 외부 API에 부하를 주지 않으면서도 효율적으로 처리할 수 있었다.

async def fetch_with_semaphore(sem, session, user_id):
    async with sem:
        return await fetch_user_data(session, user_id)

async def main():
    sem = asyncio.Semaphore(10)  # 최대 10개 동시 요청
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_semaphore(sem, session, uid) for uid in range(100)]
        results = await asyncio.gather(*tasks, return_exceptions=True)

처리 시간이 순차 실행 대비 1/8 수준으로 단축되었다. asyncio 문서를 제대로 읽어봤어야 했는데 삽질을 좀 했다.

Python 비동기 처리에서 asyncio.gather 예외 처리 문제