Python study

asynico를 이용한 동시성

adulty22 2024. 6. 6. 15:41

asyncio

asyncio는 파이썬에서 비동기 I/O 작업을 작성할 수 있게 해주는 라이브러리이다. 많은 I/O 작업을 수행하는 프로그램(예: 웹 스크래핑, 네트워크 요청 또는 파일 I/O)에서 비동기 실행을 통해 성능과 응답성을 향상시킬 수 있다.

asyncio의 주요 개념

  1. 이벤트 루프(Event Loop):
    • 이벤트 루프는 프로그램에서 발생하는 이벤트(예: I/O 작업 완료)를 관리하고 처리하는 중심 역할을 한다.
    • asyncio에서 이벤트 루프는 여러 비동기 작업을 순차적으로 실행하고 완료될 때까지 대기한다.
  2. 코루틴(Coroutine):
    • 코루틴은 비동기 함수이다. async def로 정의된다.
    • 일반 함수와 달리, 코루틴은 await 키워드를 사용하여 다른 코루틴의 실행을 일시 중지하고 결과를 기다릴 수 있다.
    async def my_coroutine():
        await asyncio.sleep(1)
        print("Hello, asyncio!")
    
  3. 태스크(Task):
    • 태스크는 코루틴을 래핑하여 이벤트 루프에서 실행할 수 있게 해준다.
    • asyncio.create_task() 또는 loop.create_task()를 사용하여 태스크를 생성할 수 있다.
    task = asyncio.create_task(my_coroutine())
    
  4. 퓨처(Future):
    • 퓨처 객체는 나중에 완료될 결과를 나타낸다. 일반적으로 직접 생성하지 않고, 라이브러리 내부에서 사용된다.
    • 태스크도 퓨처의 서브클래스이다.
  5. await:
    • await 키워드는 코루틴 안에서 다른 코루틴이나 퓨처, 또는 다른 비동기 작업의 완료를 기다리는 데 사용된다.

기본 예제

import asyncio

async def say_hello():
    await asyncio.sleep(1)
    print("Hello, World!")

async def main():
    task1 = asyncio.create_task(say_hello())
    task2 = asyncio.create_task(say_hello())

    await task1
    await task2

asyncio.run(main())

주요 기능

  • 비동기 I/O: 파일 읽기/쓰기, 네트워크 소켓 통신 등에서 비동기 처리를 지원한다.
  • 타이머: 일정 시간이 지난 후에 실행되는 작업을 쉽게 작성할 수 있다.
  • 비동기 스트림: 데이터 스트림을 비동기적으로 처리할 수 있다.

실전 활용

asyncio는 웹 서버, 크롤러, 채팅 애플리케이션 등 다양한 분야에서 활용될 수 있다. 예를 들어, 비동기 웹 서버를 작성할 때 aiohttp 라이브러리와 함께 사용할 수 있다.

from aiohttp import web

async def handle(request):
    await asyncio.sleep(1)
    return web.Response(text="Hello, World!")

app = web.Application()
app.add_routes([web.get('/', handle)])

web.run_app(app)

위 예제는 aiohttp를 사용하여 비동기 웹 서버를 설정하고, 클라이언트 요청을 처리하는 간단한 예시이다.

asyncio는 비동기 프로그래밍을 위한 강력한 도구로, 성능과 효율성을 크게 향상시킬 수 있다.

 

@asyncio.coroutine

@asyncio.coroutine 데코레이터는 파이썬의 비동기 프로그래밍을 지원하기 위해 도입된 기능이다. 하지만 파이썬 3.5에서 async와 await 키워드가 도입되면서, 현재는 주로 구식 방식으로 간주된다. 그래도 이해를 돕기 위해 @asyncio.coroutine에 대해 설명하겠다.

@asyncio.coroutine의 개요

@asyncio.coroutine 데코레이터는 코루틴을 정의하는 데 사용되며, 이는 비동기 함수로서 실행 중에 다른 작업으로 전환될 수 있는 함수이다. 이 데코레이터는 yield 키워드와 함께 사용되어 코루틴을 일시 중지하고 나중에 다시 시작할 수 있도록 한다.

사용 예시

import asyncio

@asyncio.coroutine
def my_coroutine():
    yield from asyncio.sleep(1)
    print("Hello, asyncio!")

loop = asyncio.get_event_loop()
loop.run_until_complete(my_coroutine())
loop.close()

위 예제는 1초 동안 잠자고 나서 "Hello, asyncio!"를 출력하는 간단한 코루틴을 정의한다. 여기서 yield from은 await 키워드와 유사하게 동작하며, 다른 코루틴을 호출하고 그 결과를 기다린다.

async와 await 키워드를 사용하는 방식

파이썬 3.5 이후로는 @asyncio.coroutine 데코레이터와 yield from 대신 async와 await 키워드를 사용하는 것이 권장된다. 이 방법은 더 직관적이고 가독성이 높다.

같은 예제를 async와 await 키워드를 사용하여 작성하면 다음과 같다:

import asyncio

async def my_coroutine():
    await asyncio.sleep(1)
    print("Hello, asyncio!")

asyncio.run(my_coroutine())

이 예제는 이전 예제와 동일한 동작을 하지만, 코드가 더 간결하고 명확하다.

  • @asyncio.coroutine은 파이썬의 초기 비동기 프로그래밍 지원 방식으로, yield from과 함께 사용된다.
  • 파이썬 3.5 이후로는 async와 await 키워드를 사용하는 것이 더 권장되며, 이 방법이 더 간편하고 가독성이 좋다.
  • 새로운 코드에서는 async와 await를 사용하는 것이 좋으며, 기존 코드에서만 @asyncio.coroutine을 볼 수 있다.

 

thread 와 asyncio 사용에 대한 차이

thread와 asyncio는 파이썬에서 동시에 작업을 수행하기 위한 두 가지 주요 방법이다. 각각의 접근 방식은 고유한 장점과 단점이 있으며, 사용 사례에 따라 적합한 선택이 달라진다. 이 두 가지를 비교해보겠다.

1. 스레드 (Thread)

스레드는 프로그램 내에서 동시에 실행될 수 있는 작은 단위이다. 파이썬에서 스레드를 사용하면 여러 작업을 병렬로 수행할 수 있다. 이는 특히 CPU 바운드 작업(즉, 많은 계산을 수행하는 작업)에 유용하다.

주요 특징

  • 병렬 실행: 스레드는 여러 CPU 코어를 활용할 수 있다.
  • GIL (Global Interpreter Lock): 파이썬의 GIL은 한 번에 하나의 스레드만 실제로 파이썬 바이트코드를 실행할 수 있게 한다. 따라서 CPU 바운드 작업에서는 GIL이 병목이 될 수 있다.
  • I/O 바운드 작업: 스레드는 I/O 바운드 작업(예: 파일 I/O, 네트워크 I/O)에도 사용할 수 있으며, 이러한 경우 GIL이 큰 문제가 되지 않는다.
import threading
import time

def worker():
    print("Starting worker")
    time.sleep(2)
    print("Worker done")

threads = []
for _ in range(5):
    thread = threading.Thread(target=worker)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

2. asyncio

asyncio는 비동기 I/O 작업을 관리하기 위한 라이브러리이다. 이는 이벤트 루프를 통해 코루틴을 실행하여 효율적인 I/O 바운드 작업을 수행한다.

주요 특징

  • 단일 스레드: asyncio는 단일 스레드에서 실행되며, 코루틴 간에 빠르게 전환한다.
  • 코루틴 기반: 코루틴은 비동기 함수로, async와 await 키워드를 사용하여 정의된다.
  • I/O 바운드 작업에 최적화: 많은 I/O 바운드 작업을 동시에 처리하는 데 매우 효율적이다.
  • 낮은 오버헤드: 스레드보다 컨텍스트 전환 오버헤드가 적다.
import asyncio

async def worker():
    print("Starting worker")
    await asyncio.sleep(2)
    print("Worker done")

async def main():
    tasks = []
    for _ in range(5):
        task = asyncio.create_task(worker())
        tasks.append(task)

    await asyncio.gather(*tasks)

asyncio.run(main())

차이점 요약

  1. 실행 방식:
    • 스레드: 병렬로 실행됩니다. 각 스레드는 독립적으로 실행되며, CPU 코어를 공유한다.
    • asyncio: 단일 스레드 내에서 비동기적으로 실행된다. 코루틴은 이벤트 루프를 통해 전환된다.
  2. 적용 사례:
    • 스레드: CPU 바운드 작업, I/O 바운드 작업 둘 다 사용 가능. 그러나 GIL 때문에 CPU 바운드 작업에서 효율이 떨어질 수 있다.
    • asyncio: 주로 I/O 바운드 작업에 사용. 단일 스레드에서 많은 작업을 효율적으로 처리할 수 있다.
  3. GIL:
    • 스레드: GIL로 인해 한 번에 하나의 스레드만 실행될 수 있다. I/O 바운드 작업에서는 큰 문제가 아니지만, CPU 바운드 작업에서는 성능 저하를 초래할 수 있다.
    • asyncio: GIL과 관련 없이 단일 스레드 내에서 작동한다. I/O 작업의 효율성을 극대화한다.
  4. 오버헤드:
    • 스레드: 스레드 간의 컨텍스트 전환 오버헤드가 있다.
    • asyncio: 코루틴 전환이 더 가볍고 효율적이다.

결론적으로, CPU 바운드 작업에는 멀티스레딩이나 멀티프로세싱이 더 적합할 수 있으며, 많은 I/O 바운드 작업을 효율적으로 처리해야 하는 경우 asyncio가 더 좋은 선택이 될 수 있다. 각각의 접근 방식은 특정 상황과 요구 사항에 따라 다르게 사용될 수 있다.

 

asyncio 패키지에 대해 구성요소

asyncio는 파이썬에서 비동기 I/O를 처리하기 위한 표준 라이브러리이다. 이 패키지는 이벤트 루프, 코루틴, 태스크, 퓨처 등 다양한 구성 요소를 통해 비동기 프로그램을 효율적으로 작성할 수 있도록 도와준다. 아래에서 asyncio의 주요 구성 요소와 각각의 역할에 대해 자세히 설명하겠다.

주요 구성 요소

  1. 이벤트 루프(Event Loop):
    • 이벤트 루프는 asyncio의 중심이다. 이는 여러 비동기 작업을 관리하고, 각 작업이 완료될 때까지 대기하며, 다음 작업을 실행한다.
    • 주요 메서드:
      • asyncio.get_event_loop(): 현재 이벤트 루프를 반환한다.
      • loop.run_until_complete(): 특정 코루틴이 완료될 때까지 이벤트 루프를 실행한다.
      • loop.run_forever(): 이벤트 루프를 무한히 실행한다.
    import asyncio
    
    async def main():
        print("Hello, World!")
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    
  2. 코루틴(Coroutine):
    • 코루틴은 비동기 함수이다. async 키워드로 정의되며, await 키워드를 사용하여 다른 비동기 작업을 일시 중지하고 기다릴 수 있다.
    async def my_coroutine():
        await asyncio.sleep(1)
        print("Hello, asyncio!")
    
  3. 태스크(Task):
    • 태스크는 코루틴을 이벤트 루프에서 실행하기 위해 래핑한 객체이다. 이를 통해 코루틴을 스케줄링하고 실행 상태를 추적할 수 있다.
    • 주요 메서드:
      • asyncio.create_task(): 코루틴을 태스크로 래핑하여 실행한다.
    async def main():
        task = asyncio.create_task(my_coroutine())
        await task
    
  4. 퓨처(Future):
    • 퓨처 객체는 나중에 완료될 결과를 나타낸다. 이는 비동기 작업의 완료 상태를 추적하고, 결과 또는 예외를 저장한다.
    • 주요 메서드:
      • future.result(): 퓨처 객체의 결과를 반환한다.
      • future.set_result(): 퓨처 객체의 결과를 설정한다.
    async def set_after(future):
        await asyncio.sleep(1)
        future.set_result("Completed")
    
    async def main():
        future = asyncio.Future()
        await set_after(future)
        print(future.result())
    
    asyncio.run(main())
  5. 타임아웃 및 딜레이:
    • asyncio.sleep(): 주어진 시간 동안 일시 중지한다.
    • asyncio.wait_for(): 주어진 시간 내에 특정 코루틴이 완료되기를 기다린다.
    async def main():
        try:
            await asyncio.wait_for(my_coroutine(), timeout=0.5)
        except asyncio.TimeoutError:
            print("Timed out!")
    
    asyncio.run(main())
    
  6. 비동기 I/O:
    • asyncio는 파일 I/O, 네트워크 소켓 등을 비동기적으로 처리할 수 있는 다양한 API를 제공한다.
    • 예를 들어, asyncio.StreamReader와 asyncio.StreamWriter를 사용하여 비동기 네트워크 통신을 할 수 있다.
    async def handle_client(reader, writer):
        data = await reader.read(100)
        message = data.decode()
        print(f"Received: {message}")
    
        writer.write(data)
        await writer.drain()
    
        writer.close()
    
    async def main():
        server = await asyncio.start_server(handle_client, '127.0.0.1', 8888)
        async with server:
            await server.serve_forever()
    
    asyncio.run(main())
    

주요 API 및 기능

  • asyncio.run(coro): 코루틴을 실행하고 완료될 때까지 대기한다.
  • asyncio.gather(*coros): 여러 코루틴을 동시에 실행하고, 모든 코루틴이 완료될 때까지 기다린다.
  • asyncio.shield(coro): 특정 코루틴을 취소되지 않도록 보호한다.
  • asyncio.Event: 비동기 이벤트 객체이다. 한 태스크가 이벤트를 설정(set)하고 다른 태스크가 이를 기다릴 수 있다.
  • asyncio.Queue: 비동기 큐로, 태스크 간의 데이터를 교환하는 데 사용된다.

예제

간단한 예제로 여러 I/O 작업을 비동기적으로 처리하는 프로그램을 작성해보겠다.

import asyncio

async def fetch_data(id):
    print(f"Fetching data for {id}...")
    await asyncio.sleep(1)
    print(f"Data for {id} fetched!")
    return f"Data {id}"

async def main():
    tasks = [asyncio.create_task(fetch_data(i)) for i in range(5)]
    results = await asyncio.gather(*tasks)
    print("All data fetched:", results)

asyncio.run(main())

위의 예제는 5개의 비동기 작업을 동시에 실행하고, 모든 작업이 완료될 때까지 기다린 후 결과를 출력한다.

asyncio는 다양한 비동기 작업을 효율적으로 처리할 수 있는 강력한 도구이다. 이를 통해 네트워크 I/O, 파일 I/O, 타이머, 이벤트 등의 작업을 비동기적으로 수행할 수 있으며, 이를 적절히 활용하면 프로그램의 성능과 응답성을 크게 향상시킬 수 있다.

 

asyncio 와 aiohttp로 내려받기

aiohttp는 비동기 HTTP 클라이언트/서버 라이브러리로, HTTP 요청을 비동기적으로 처리할 수 있게 해준다. 이를 asyncio와 함께 사용하면 동시에 여러 파일을 효율적으로 다운로드할 수 있다.

주요 구성 요소

  1. asyncio: 비동기 I/O 작업을 관리하는 라이브러리이다.
  2. aiohttp: 비동기 HTTP 클라이언트로, HTTP 요청을 비동기적으로 처리할 수 있다.

설치

먼저 aiohttp를 설치해야 한다. 다음 명령어를 사용하여 설치할 수 있다:

pip install aiohttp

비동기 파일 다운로드 예제

다음은 asyncio와 aiohttp를 사용하여 여러 파일을 동시에 다운로드하는 예제이다.

import aiohttp
import asyncio

async def download_file(session, url, file_path):
    async with session.get(url) as response:
        if response.status == 200:
            with open(file_path, 'wb') as f:
                while True:
                    chunk = await response.content.read(1024)
                    if not chunk:
                        break
                    f.write(chunk)
            print(f"Downloaded {file_path}")
        else:
            print(f"Failed to download {url}")

async def main(urls, file_paths):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url, file_path in zip(urls, file_paths):
            task = asyncio.create_task(download_file(session, url, file_path))
            tasks.append(task)
        await asyncio.gather(*tasks)

# 다운로드할 URL 목록과 저장할 파일 경로 목록
urls = [
    '<http://example.com/file1.txt>',
    '<http://example.com/file2.txt>',
    '<http://example.com/file3.txt>',
]

file_paths = [
    'file1.txt',
    'file2.txt',
    'file3.txt',
]

# 비동기 다운로드 실행
asyncio.run(main(urls, file_paths))

코드 설명

  1. download_file 함수:
    • session.get(url)을 사용하여 URL에서 파일을 비동기적으로 요청한다.
    • 응답 상태가 200(성공)인 경우, 파일을 다운로드하여 저장한다.
    • 파일을 읽어들일 때, 한 번에 1024바이트씩 읽어와서 파일에 쓴다. 이는 메모리 사용을 최적화하는 데 도움이 된다.
  2. main 함수:
    • aiohttp.ClientSession()을 사용하여 HTTP 세션을 생성한다.
    • 각 URL과 파일 경로에 대해 download_file 함수를 태스크로 생성하고, 이를 tasks 리스트에 추가한다.
    • asyncio.gather(*tasks)를 사용하여 모든 다운로드 태스크가 완료될 때까지 기다린다.
  3. URL과 파일 경로:
    • 다운로드할 URL 목록과 해당 파일을 저장할 경로 목록을 정의한다.
    • asyncio.run(main(urls, file_paths))을 호출하여 비동기 다운로드를 실행한다.

추가 고려 사항

  1. 예외 처리:
    • 네트워크 오류나 파일 쓰기 오류에 대해 예외 처리를 추가하여 코드가 강력해질 수 있다.
    async def download_file(session, url, file_path):
        try:
            async with session.get(url) as response:
                response.raise_for_status()
                with open(file_path, 'wb') as f:
                    while True:
                        chunk = await response.content.read(1024)
                        if not chunk:
                            break
                        f.write(chunk)
                print(f"Downloaded {file_path}")
        except aiohttp.ClientError as e:
            print(f"Failed to download {url}: {e}")
    
  2. 시간 제한:
    • 특정 시간 내에 다운로드가 완료되지 않으면 타임아웃 처리를 할 수 있다.
    async def download_file(session, url, file_path, timeout):
        try:
            async with session.get(url, timeout=timeout) as response:
                response.raise_for_status()
                with open(file_path, 'wb') as f:
                    while True:
                        chunk = await response.content.read(1024)
                        if not chunk:
                            break
                        f.write(chunk)
                print(f"Downloaded {file_path}")
        except asyncio.TimeoutError:
            print(f"Download timed out for {url}")
        except aiohttp.ClientError as e:
            print(f"Failed to download {url}: {e}")
    
  3. 병렬 다운로드 개수 제한:
    • 너무 많은 파일을 동시에 다운로드하면 시스템 자원이 부족해질 수 있다. asyncio.Semaphore를 사용하여 동시에 실행되는 태스크 수를 제한할 수 있다.
    async def download_file(semaphore, session, url, file_path):
        async with semaphore:
            async with session.get(url) as response:
                response.raise_for_status()
                with open(file_path, 'wb') as f:
                    while True:
                        chunk = await response.content.read(1024)
                        if not chunk:
                            break
                        f.write(chunk)
                print(f"Downloaded {file_path}")
    
    async def main(urls, file_paths, max_concurrent_downloads):
        semaphore = asyncio.Semaphore(max_concurrent_downloads)
        async with aiohttp.ClientSession() as session:
            tasks = []
            for url, file_path in zip(urls, file_paths):
                task = asyncio.create_task(download_file(semaphore, session, url, file_path))
                tasks.append(task)
            await asyncio.gather(*tasks)
    
    urls = [...]
    file_paths = [...]
    asyncio.run(main(urls, file_paths, max_concurrent_downloads=3))
    

이 예제는 asyncio와 aiohttp를 사용하여 비동기적으로 파일을 다운로드하는 방법을 보여준다. 이를 통해 네트워크 I/O를 효율적으로 처리하고, 동시에 여러 파일을 효과적으로 다운로드할 수 있다.

 

블로킹 호출을 에둘러 실행하기

비동기 프로그래밍 환경에서 블로킹 호출을 에둘러 실행하는 것은 매우 중요하다. 블로킹 호출은 현재 실행 중인 작업을 멈추고 해당 호출이 완료될 때까지 기다리는 작업을 의미한다. 이는 비동기 프로그래밍의 주요 장점인 동시성을 저해할 수 있다. 따라서 블로킹 호출을 비동기 환경에서 에둘러 실행하기 위해 다양한 기법과 전략을 사용할 수 있다. 아래에서는 이러한 개념과 이론에 대해 자세히 설명하겠다.

블로킹 호출과 비동기 프로그래밍

비동기 프로그래밍의 핵심은 이벤트 루프를 사용하여 동시에 여러 작업을 처리하는 것입니다. 이벤트 루프는 비동기 작업을 관리하고, 각 작업이 완료될 때까지 대기하면서 다른 작업을 계속 진행합니다. 그러나 블로킹 호출이 발생하면 해당 호출이 완료될 때까지 이벤트 루프가 멈추게 되며, 이는 다른 비동기 작업의 진행을 방해합니다.

블로킹 호출을 에둘러 실행하는 방법

  1. 스레드 풀 사용 (Thread Pool)
    • 블로킹 호출을 별도의 스레드에서 실행하여 이벤트 루프가 계속 진행되도록 한다.
    • concurrent.futures.ThreadPoolExecutor와 asyncio의 run_in_executor 메서드를 사용하여 구현할 수 있다.
    import asyncio
    import concurrent.futures
    import time
    
    def blocking_task():
        time.sleep(5)
        return "Blocking task completed"
    
    async def main():
        loop = asyncio.get_running_loop()
        with concurrent.futures.ThreadPoolExecutor() as pool:
            result = await loop.run_in_executor(pool, blocking_task)
            print(result)
    
    asyncio.run(main())
    
  2. 프로세스 풀 사용 (Process Pool)
    • CPU 바운드 블로킹 작업의 경우, 프로세스 풀을 사용하여 멀티프로세싱을 통해 작업을 분산시킬 수 있다.
    • concurrent.futures.ProcessPoolExecutor와 함께 사용할 수 있다.
    import asyncio
    import concurrent.futures
    import time
    
    def cpu_bound_task():
        time.sleep(5)
        return "CPU-bound task completed"
    
    async def main():
        loop = asyncio.get_running_loop()
        with concurrent.futures.ProcessPoolExecutor() as pool:
            result = await loop.run_in_executor(pool, cpu_bound_task)
            print(result)
    
    asyncio.run(main())
    
  3. 비동기 I/O 바운드 작업
    • 가능한 경우, 블로킹 I/O 작업을 비동기 I/O 작업으로 전환하는 것이 좋다.
    • 예를 들어, aiohttp 라이브러리를 사용하여 네트워크 요청을 비동기적으로 처리할 수 있다.
    import aiohttp
    import asyncio
    
    async def fetch(url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.text()
    
    async def main():
        result = await fetch('<http://example.com>')
        print(result)
    
    asyncio.run(main())
    

블로킹 호출을 비동기적으로 실행하는 전략의 장점

  1. 동시성 향상: 블로킹 호출이 이벤트 루프를 방해하지 않으므로 다른 비동기 작업이 계속 실행될 수 있다.
  2. 응답성 향상: 비동기 프로그래밍 환경에서 응답성을 유지할 수 있다. 특히 사용자 인터페이스를 포함한 애플리케이션에서 중요하다.
  3. 자원 효율성: 비동기 I/O 작업을 사용하면 스레드나 프로세스를 추가로 생성하지 않아도 되므로 자원을 효율적으로 사용할 수 있다.

이론적 배경

  • 이벤트 루프: 비동기 프로그래밍의 핵심으로, 비동기 작업을 관리하고, 완료된 작업을 처리하는 역할을 한다.
  • 컨텍스트 전환: 스레드나 프로세스를 전환하는 작업으로, 오버헤드가 발생할 수 있다. 따라서 가능한 한 이벤트 루프 내에서 비동기 작업을 처리하는 것이 효율적이다.
  • 멀티스레딩과 멀티프로세싱: 블로킹 작업을 비동기적으로 처리하기 위한 일반적인 접근 방식이다. 각각의 장단점이 있으며, 작업의 특성에 따라 적절한 방법을 선택해야 한다.

블로킹 호출을 에둘러 실행하는 것은 비동기 프로그래밍 환경에서 매우 중요하다. 이를 통해 프로그램의 동시성과 응답성을 유지할 수 있다. asyncio와 같은 비동기 라이브러리와 concurrent.futures와 같은 멀티스레딩/멀티프로세싱 도구를 사용하면 효율적으로 블로킹 작업을 처리할 수 있다. 각각의 방법과 전략을 상황에 맞게 활용하여 비동기 프로그래밍의 이점을 최대한 살리는 것이 좋다.

 

asyncio.as_completed

asyncio.as_completed는 비동기 프로그래밍에서 여러 태스크를 동시에 실행하고, 각 태스크가 완료되는 순서대로 결과를 얻을 수 있게 해주는 함수이다. 이는 태스크의 완료 순서에 상관없이, 완료되는 대로 결과를 처리해야 할 때 매우 유용하다.

주요 특징 및 사용법

  1. 동시 실행:
    • 여러 비동기 작업을 동시에 실행한다.
    • 모든 작업이 완료될 때까지 기다리지 않고, 하나의 작업이 완료되면 즉시 결과를 처리할 수 있다.
  2. 결과 순서:
    • 입력된 태스크의 완료 순서대로 결과를 반환한다. 즉, 먼저 완료된 태스크의 결과가 먼저 반환된다.

사용법

asyncio.as_completed는 다음과 같이 사용된다:

  1. 비동기 작업을 정의한다.
  2. asyncio.as_completed를 사용하여 비동기 작업을 실행하고, 각 작업이 완료될 때마다 결과를 처리한다.

예제

아래는 asyncio.as_completed를 사용하는 예제이다. 이 예제에서는 여러 URL을 동시에 요청하고, 각 요청이 완료되는 대로 결과를 출력한다.

import asyncio
import aiohttp

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main(urls):
    tasks = [fetch(url) for url in urls]

    for future in asyncio.as_completed(tasks):
        result = await future
        print(result[:100])  # 첫 100자를 출력

urls = [
    '<http://example.com>',
    '<http://example.org>',
    '<http://example.net>',
]

asyncio.run(main(urls))

코드 설명

  1. fetch 함수:
    • aiohttp 라이브러리를 사용하여 URL에서 데이터를 비동기적으로 가져온다.
  2. main 함수:
    • 요청할 URL 목록을 받아서, 각 URL에 대한 fetch 함수를 태스크로 생성한다.
    • asyncio.as_completed를 사용하여 태스크 리스트를 전달한다.
    • 각 태스크가 완료될 때마다 await future를 통해 결과를 받아 처리한다.

주요 포인트

  • 비동기 작업의 동시 실행:
    • 여러 URL을 동시에 요청하여 병렬로 작업을 처리할 수 있다.
  • 작업 완료 순서대로 결과 처리:
    • asyncio.as_completed는 태스크가 완료되는 대로 결과를 반환하므로, 결과가 준비되는 대로 즉시 처리할 수 있다.
  • 유용한 시나리오:
    • 다수의 비동기 작업이 있을 때, 각 작업의 완료 순서에 관계없이 완료되는 즉시 결과를 처리하고자 할 때 유용하다.
    • 예를 들어, 여러 데이터 소스에서 데이터를 가져와야 하는 경우, 먼저 완료된 데이터부터 바로 처리할 수 있다.

추가 예제: 작업이 완료될 때마다 상태 업데이트

아래 예제는 여러 비동기 작업을 동시에 실행하고, 각 작업이 완료될 때마다 상태를 업데이트한다.

import asyncio

async def my_task(id):
    await asyncio.sleep(id)
    return f"Task {id} completed"

async def main():
    tasks = [my_task(i) for i in range(5)]

    for future in asyncio.as_completed(tasks):
        result = await future
        print(result)

asyncio.run(main())

설명:

  • my_task 함수:
    • 태스크 ID를 받아서 해당 ID만큼 asyncio.sleep으로 대기한 후 완료 메시지를 반환한다.
  • main 함수:
    • 5개의 태스크를 생성하여 asyncio.as_completed에 전달한다.
    • 각 태스크가 완료될 때마다 결과를 출력한다.

이 예제에서는 ID가 작은 태스크가 먼저 완료되므로, 완료된 순서대로 출력되는 것을 확인할 수 있다.

asyncio.as_completed는 여러 비동기 태스크를 동시에 실행하고, 완료되는 대로 결과를 처리할 수 있게 해주는 강력한 도구입니다. 이를 사용하면 비동기 작업의 동시성을 최대한 활용하면서, 각 작업이 완료되는 순서대로 결과를 효율적으로 처리할 수 있습니다. 이는 특히 네트워크 요청이나 I/O 작업에서 유용합니다.

 

executor를 이용해서 이벤트 루프 블로킹 피하기

asyncio를 사용하는 비동기 프로그래밍에서 이벤트 루프가 블로킹되지 않도록 하기 위해 executor를 사용할 수 있다. 이는 특히 CPU 바운드 작업이나 블로킹 I/O 작업을 비동기적으로 실행할 때 유용하다. executor를 사용하면 이러한 작업을 별도의 스레드나 프로세스에서 실행하여 이벤트 루프가 다른 비동기 작업을 계속 수행할 수 있게 한다.

주요 개념

  1. 이벤트 루프 (Event Loop):
    • 비동기 작업을 관리하고 실행하는 중심 요소이다.
    • 블로킹 작업이 이벤트 루프에서 실행되면 다른 비동기 작업이 지연될 수 있다.
  2. Executor:
    • 블로킹 작업을 비동기적으로 실행하기 위해 사용되는 스레드 풀이나 프로세스 풀이다.
    • concurrent.futures.ThreadPoolExecutor와 concurrent.futures.ProcessPoolExecutor가 대표적이다.

executor 사용 방법

  1. ThreadPoolExecutor:
    • 블로킹 I/O 작업이나 비교적 가벼운 CPU 바운드 작업에 적합하다.
  2. ProcessPoolExecutor:
    • 무거운 CPU 바운드 작업에 적합하다. 각 작업이 별도의 프로세스에서 실행되기 때문에 GIL(Global Interpreter Lock)의 영향을 받지 않는다.

ThreadPoolExecutor 사용 예제

아래는 ThreadPoolExecutor를 사용하여 블로킹 작업을 비동기적으로 실행하는 예제이다.

import asyncio
import concurrent.futures
import time

def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    time.sleep(5)  # 블로킹 I/O 작업 (예: 파일 읽기/쓰기)
    print(f"end blocking_io at {time.strftime('%X')}")

async def main():
    loop = asyncio.get_running_loop()

    with concurrent.futures.ThreadPoolExecutor() as pool:
        # 블로킹 작업을 스레드 풀에서 실행
        await loop.run_in_executor(pool, blocking_io)

asyncio.run(main())

ProcessPoolExecutor 사용 예제

아래는 ProcessPoolExecutor를 사용하여 CPU 바운드 작업을 비동기적으로 실행하는 예제이다.

import asyncio
import concurrent.futures
import time

def cpu_bound():
    print(f"start cpu_bound at {time.strftime('%X')}")
    count = 0
    for _ in range(10**7):
        count += 1  # CPU 바운드 작업
    print(f"end cpu_bound at {time.strftime('%X')}")
    return count

async def main():
    loop = asyncio.get_running_loop()

    with concurrent.futures.ProcessPoolExecutor() as pool:
        # CPU 바운드 작업을 프로세스 풀에서 실행
        result = await loop.run_in_executor(pool, cpu_bound)
        print(f"Result: {result}")

asyncio.run(main())

코드 설명

  1. blocking_io 함수:
    • 블로킹 I/O 작업을 시뮬레이트한다. time.sleep(5)를 사용하여 5초 동안 대기한다.
  2. cpu_bound 함수:
    • CPU 바운드 작업을 시뮬레이트한다. 큰 루프를 실행하여 CPU를 사용한다.
  3. main 함수:
    • 현재 실행 중인 이벤트 루프를 가져온다.
    • concurrent.futures.ThreadPoolExecutor 또는 ProcessPoolExecutor를 사용하여 블로킹 작업을 실행한다.
    • loop.run_in_executor를 사용하여 블로킹 작업을 비동기적으로 실행하고, 완료될 때까지 기다린다.

실행 흐름

  1. 이벤트 루프 시작:
    • asyncio.run(main())을 호출하여 이벤트 루프를 시작한다.
  2. 스레드 풀 또는 프로세스 풀에서 작업 실행:
    • loop.run_in_executor(pool, blocking_io) 또는 loop.run_in_executor(pool, cpu_bound)를 호출하여 블로킹 작업을 스레드 풀 또는 프로세스 풀에서 실행한다.
  3. 블로킹 작업 비동기 실행:
    • 블로킹 작업이 별도의 스레드 또는 프로세스에서 실행되므로, 이벤트 루프가 다른 비동기 작업을 계속 수행할 수 있다.
  4. 작업 완료 후 결과 처리:
    • 작업이 완료되면, 결과를 처리한다.

asyncio와 executor를 사용하면 블로킹 작업을 비동기적으로 처리하여 이벤트 루프가 다른 비동기 작업을 계속 수행할 수 있게 할 수 있다. 이는 비동기 프로그램의 동시성을 유지하고 성능을 최적화하는 데 매우 유용하다. 스레드 풀과 프로세스 풀을 적절히 사용하여 블로킹 작업을 효율적으로 처리할 수 있다.

 

콜백지옥

"콜백 지옥"은 비동기 프로그래밍에서 콜백 함수를 중첩해서 사용하다 보니 코드의 가독성과 유지보수성이 떨어지는 현상을 가리키는 용어이다. 콜백 지옥은 특히 자바스크립트와 같은 비동기 프로그래밍이 많이 사용되는 언어에서 자주 언급된다. 하지만 파이썬에서도 비동기 프로그래밍을 할 때 콜백을 많이 사용하면 비슷한 문제가 발생할 수 있다.

콜백 지옥의 특징

  1. 중첩된 콜백: 콜백 함수가 여러 단계로 중첩되면서 코드가 우측으로 계속해서 밀려나는 현상.
  2. 가독성 저하: 코드의 흐름을 따라가기가 어려워지고, 코드를 읽거나 디버깅하기가 어려워진다.
  3. 유지보수 어려움: 코드가 복잡해지면서 새로운 기능을 추가하거나 기존 기능을 수정하는 것이 어렵다.

콜백 지옥 예제

다음은 콜백 지옥의 예시이다. 이 코드는 비동기 함수 호출이 중첩되어 가독성이 매우 떨어진다.

import asyncio


def callback1(future):
    print("Callback 1")
    future2 = asyncio.Future()
    future2.add_done_callback(callback2)
    loop = asyncio.get_running_loop()
    loop.call_later(1, future2.set_result, "Result 2")


def callback2(future):
    print("Callback 2")
    future3 = asyncio.Future()
    future3.add_done_callback(callback3)
    loop = asyncio.get_running_loop()
    loop.call_later(1, future3.set_result, "Result 3")


def callback3(future):
    print("Callback 3")
    print(future.result())


async def main():
    future1 = asyncio.Future()
    future1.add_done_callback(callback1)
    loop = asyncio.get_running_loop()
    loop.call_later(1, future1.set_result, "Result 1")
    await asyncio.sleep(3)  # 이벤트 루프가 콜백을 실행할 시간을 주기 위해 대기

asyncio.run(main())
'''
Callback 1
Callback 2
Callback 3
Result 3
'''

이 예제에서는 콜백이 여러 단계로 중첩되어 있다. 콜백이 완료되면 다음 콜백이 실행되고, 이러한 과정이 반복된다. 코드의 흐름을 따라가기가 매우 어렵고, 가독성이 떨어진다.

콜백 지옥을 피하는 방법

  1. async/await 사용:
    • async/await 키워드를 사용하면 콜백 지옥을 피하고, 코드를 더 직관적으로 작성할 수 있다.
  2. 비동기 함수 체이닝:
    • 비동기 함수를 체이닝하여 연속적으로 실행할 수 있다.

async/await를 사용한 예제

위의 예제를 async/await를 사용하여 다시 작성하면 가독성이 훨씬 좋아진다.

import asyncio

async def main():
    print("Callback 1")
    await asyncio.sleep(1)

    print("Callback 2")
    await asyncio.sleep(1)

    print("Callback 3")
    result = "Result 3"
    print(result)

asyncio.run(main())

이 예제에서는 비동기 함수 main에서 await 키워드를 사용하여 각 단계의 비동기 작업을 기다린다. 코드가 직관적이며, 콜백 지옥 없이 비동기 작업을 순차적으로 실행할 수 있다.

콜백 지옥은 비동기 프로그래밍에서 콜백 함수의 과도한 중첩으로 인해 발생하는 문제이다. 이를 피하기 위해 파이썬에서는 async/await 키워드를 사용하는 것이 좋다. 이는 코드를 더 직관적이고 가독성 있게 만들어 주며, 유지보수를 쉽게 할 수 있도록 도와준다. 비동기 프로그래밍을 할 때는 가능한 한 콜백 대신 async/await를 사용하는 것이 좋은 선택이다.

 

모든 코드는 github에 저장되어 있습니다.

https://github.com/SeongUk18/python

728x90