Future 클래스
Future 클래스는 비동기 프로그래밍에서 중요한 역할을 하는 클래스이다. 주로 concurrent.futures 모듈이나 asyncio 모듈에서 사용된다. 이 클래스는 비동기 작업의 상태와 결과를 추적하는 데 사용된다.
concurrent.futures.Future
concurrent.futures.Future 클래스는 ThreadPoolExecutor나 ProcessPoolExecutor와 같은 실행자(executor)에서 비동기 작업의 결과를 나타낸다. Future는 비동기 작업의 상태를 나타내는 객체로, 작업이 완료될 때까지 기다리거나 작업의 결과 또는 예외를 얻는 기능을 제공한다. 이 객체는 비동기 작업의 실행을 추적하고, 완료 시 콜백을 실행하거나 결과를 반환할 수 있도록 한다. 이 클래스는 아래와 같은 메서드와 속성을 제공한다:
- cancel(): 작업이 아직 실행되지 않았다면 작업을 취소하고, 성공적으로 취소되면 True를 반환한다.
- cancelled(): 작업이 취소되었는지 여부를 반환한다.
- running(): 작업이 현재 실행 중인지 여부를 반환한다.
- done(): 작업이 완료되었는지 여부를 반환한다.
- result(timeout=None): 작업의 결과를 반환한다. 작업이 아직 완료되지 않았다면 완료될 때까지 기다린다.
- exception(timeout=None): 작업 중 발생한 예외를 반환한다. 작업이 아직 완료되지 않았다면 완료될 때까지 기다린다.
- add_done_callback(fn): 작업이 완료되면 호출할 콜백 함수를 추가한다.
import concurrent.futures
def square(n):
return n * n
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(square, 10)
result = future.result()
print(result) # Output: 100
concurrent.futures.Future 작동 방식
concurrent.futures.Future 클래스는 비동기 작업의 상태를 추적하기 위해 여러 상태를 가진다:
- Pending: 작업이 아직 시작되지 않은 상태이다.
- Running: 작업이 실행 중인 상태이다.
- Done: 작업이 완료된 상태이다. 작업이 성공적으로 완료되었거나 예외가 발생한 경우 모두 포함한다.
- Cancelled: 작업이 취소된 상태이다.
작업의 상태는 내부적으로 Future 객체의 메서드를 통해 변경된다. 예를 들어, executor.submit() 메서드는 Future 객체를 반환하고, 이 객체를 통해 작업의 상태를 추적할 수 있다. 작업이 완료되면 set_result 또는 set_exception 메서드가 호출되어 작업의 결과나 예외를 설정한다.
다음은 concurrent.futures.Future 클래스의 작동 원리를 설명하는 예제이다:
import concurrent.futures
import time
def slow_function(seconds):
time.sleep(seconds)
return f"Completed after {seconds} seconds"
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(slow_function, 3)
print(future.done()) # False
time.sleep(1)
print(future.done()) # False
result = future.result() # This will block until the result is available
print(result) # Completed after 3 seconds
asyncio.Future
asyncio.Future 클래스는 asyncio 모듈에서 비동기 작업의 결과를 나타내는 데 사용된다. await 키워드를 사용하여 비동기 함수 내에서 작업이 완료될 때까지 기다릴 수 있다. 주요 메서드와 속성은 다음과 같다:
- cancel(): 작업을 취소한다.
- cancelled(): 작업이 취소되었는지 여부를 반환한다.
- done(): 작업이 완료되었는지 여부를 반환한다.
- result(): 작업의 결과를 반환한다. 작업이 아직 완료되지 않았다면 완료될 때까지 기다린다.
- exception(): 작업 중 발생한 예외를 반환한다. 작업이 아직 완료되지 않았다면 완료될 때까지 기다린다.
- add_done_callback(fn): 작업이 완료되면 호출할 콜백 함수를 추가한다.
import asyncio
async def main():
future = asyncio.Future()
async def set_future():
await asyncio.sleep(1)
future.set_result('Completed')
asyncio.create_task(set_future())
result = await future
print(result) # Output: Completed
asyncio.run(main())
asyncio.Future 작동 방식
asyncio.Future 클래스는 비동기 함수와 이벤트 루프에서 비동기 작업의 상태를 추적하는 데 사용된다. 이 클래스는 async/await 구문을 사용하여 비동기 작업을 관리한다. asyncio.Future 객체는 다음과 같은 상태를 가진다:
- Pending: 작업이 아직 시작되지 않은 상태이다.
- Done: 작업이 완료된 상태이다. 작업이 성공적으로 완료되었거나 예외가 발생한 경우 모두 포함한다.
- Cancelled: 작업이 취소된 상태이다.
asyncio.Future 객체는 이벤트 루프에서 실행되는 비동기 작업의 결과를 설정하거나 예외를 설정할 수 있다. set_result 또는 set_exception 메서드를 호출하여 작업의 결과나 예외를 설정할 수 있다. await 키워드를 사용하면 작업이 완료될 때까지 기다릴 수 있다.
주요 차이점
- concurrent.futures.Future는 주로 스레드 또는 프로세스 풀에서 비동기 작업을 관리하는 데 사용된다.
- asyncio.Future는 비동기 함수와 이벤트 루프를 사용한 비동기 작업에서 사용된다.
이 두 클래스는 비동기 작업의 결과를 추적하고 관리하는 데 있어 유사한 기능을 제공하지만, 사용되는 컨텍스트에 따라 다르다. concurrent.futures는 스레드와 프로세스를 통한 병렬 작업을 위해 설계되었고, asyncio는 코루틴과 이벤트 루프를 사용한 비동기 프로그래밍을 위해 설계되었다.
상태 전환 다이어그램
Future 객체의 상태 전환을 도식화하면 다음과 같다:
- Pending → Running: 작업이 시작될 때 상태가 변경된다 (concurrent.futures에서만 해당).
- Pending → Done: 작업이 완료되면 상태가 변경된다.
- Pending → Cancelled: 작업이 취소되면 상태가 변경된다.
- Running → Done: 작업이 완료되면 상태가 변경된다 (concurrent.futures에서만 해당).
- Running → Cancelled: 작업이 취소되면 상태가 변경된다 (concurrent.futures에서만 해당).
- concurrent.futures.Future는 스레드 또는 프로세스 풀에서 비동기 작업의 상태와 결과를 추적한다.
- asyncio.Future는 asyncio 이벤트 루프에서 비동기 작업의 상태와 결과를 추적한다.
- 두 클래스 모두 작업이 완료되었는지 여부를 확인하고, 결과나 예외를 설정하거나 반환하는 메서드를 제공한다.
- Future 객체의 상태 전환은 Pending, Running, Done, Cancelled와 같이 정의되며, 이는 작업의 현재 상태를 나타낸다.
이론적으로, Future 클래스는 비동기 프로그래밍의 핵심 구성 요소로, 비동기 작업의 상태를 추적하고 관리하는 데 매우 유용한다.
concurrent.futures로 프로세스 실행하기
concurrent.futures 모듈을 사용하여 프로세스를 실행하면 여러 프로세스를 병렬로 실행하여 CPU 집약적인 작업을 효율적으로 처리할 수 있다. 이를 위해 concurrent.futures 모듈의 ProcessPoolExecutor 클래스를 사용한다. 이 클래스는 별도의 프로세스를 생성하고 이를 통해 작업을 병렬로 실행할 수 있도록 도와준다.
ProcessPoolExecutor의 주요 개념과 사용법
주요 개념
- 프로세스 풀: ProcessPoolExecutor는 고정된 수의 프로세스를 미리 생성하고 작업을 이 프로세스들에 분배한다. 이는 프로세스 생성의 오버헤드를 줄이고 병렬 처리를 효율적으로 수행할 수 있게 한다.
- 비동기 작업 제출: 작업은 submit 메서드를 통해 제출되며, 각 작업은 별도의 프로세스에서 실행된다.
- Future 객체: submit 메서드는 Future 객체를 반환하며, 이 객체를 통해 작업의 상태를 추적하고 결과를 얻을 수 있다.
- 컨텍스트 관리자: ProcessPoolExecutor는 컨텍스트 관리자(with 구문)로 사용되어 자원 관리를 자동으로 처리한다.
사용법
- ProcessPoolExecutor 생성: ProcessPoolExecutor 객체를 생성할 때 풀의 크기를 지정할 수 있다.
- 작업 제출: submit 메서드를 사용하여 작업을 제출한다. 이 메서드는 Future 객체를 반환한다.
- 결과 확인: Future 객체의 result 메서드를 호출하여 작업 결과를 얻는다. 작업이 완료될 때까지 기다리게 된다.
- 콜백 추가: add_done_callback 메서드를 사용하여 작업이 완료되면 호출될 콜백 함수를 등록할 수 있다.
다음은 concurrent.futures.ProcessPoolExecutor를 사용하여 병렬로 작업을 실행하는 예제이다:
import concurrent.futures
import os
def compute_square(n):
print(f"Process ID: {os.getpid()} computing {n} * {n}")
return n * n
if __name__ == '__main__':
numbers = [1, 2, 3, 4, 5]
# ProcessPoolExecutor를 사용하여 병렬로 작업 실행
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
# 작업 제출
futures = [executor.submit(compute_square, num) for num in numbers]
# 결과 가져오기
for future in concurrent.futures.as_completed(futures):
print(f"Result: {future.result()}")
'''
Process ID: 12345 computing 1 * 1
Process ID: 12346 computing 2 * 2
Process ID: 12347 computing 3 * 3
Process ID: 12345 computing 4 * 4
Process ID: 12346 computing 5 * 5
Result: 1
Result: 4
Result: 9
Result: 16
Result: 25
'''
- ProcessPoolExecutor(max_workers=3): 최대 3개의 프로세스를 사용할 수 있는 풀을 생성한다.
- executor.submit(compute_square, num): 각 숫자에 대해 compute_square 함수를 병렬로 실행하도록 작업을 제출한다.
- concurrent.futures.as_completed(futures): 작업이 완료된 순서대로 Future 객체를 반환하는 이터레이터를 생성한다.
- future.result(): 각 작업의 결과를 출력한다.
참고 사항
- GIL: Python의 Global Interpreter Lock(GIL) 때문에 스레드를 사용하는 ThreadPoolExecutor는 CPU 집약적인 작업에 효율적이지 않다. 반면, ProcessPoolExecutor는 각 작업이 별도의 프로세스에서 실행되기 때문에 GIL의 영향을 받지 않아 CPU 집약적인 작업에 더 적합하다.
- 프로세스 생성 오버헤드: 프로세스를 생성하는 데는 일정한 오버헤드가 있으므로, 매우 짧은 작업보다는 상대적으로 긴 작업에 ProcessPoolExecutor가 더 효율적이다.
이처럼 ProcessPoolExecutor를 사용하면 Python에서 여러 프로세스를 활용한 병렬 처리를 손쉽게 구현할 수 있다.
Executor.map
concurrent.futures 모듈의 Executor 클래스는 ThreadPoolExecutor와 ProcessPoolExecutor를 포함하며, 이 두 클래스는 공통적으로 map 메서드를 제공한다. map 메서드는 여러 입력 값을 하나의 함수에 병렬로 적용하는 기능을 제공한다.
Executor.map 메서드의 개요
Executor.map 메서드는 주어진 함수와 반복 가능한(iterable) 입력 값을 받아 함수의 결과를 반환하는 반복자를 생성한다. 이는 map 함수와 유사하지만, 주어진 함수가 병렬로 실행된다는 점에서 차이가 있다.
주요 특징
- 병렬 실행: 입력 값에 대해 함수를 병렬로 실행한다. ThreadPoolExecutor는 스레드를 사용하고, ProcessPoolExecutor는 프로세스를 사용하여 병렬로 작업을 수행한다.
- 반복자 반환: map 메서드는 각 입력 값에 대해 함수의 결과를 순서대로 포함하는 반복자를 반환한다.
- 블로킹: 기본적으로 map 메서드는 모든 작업이 완료될 때까지 블록한다. 즉, 모든 작업이 끝날 때까지 기다렸다가 결과를 반환한다.
사용법
- Executor.map(func, *iterables, timeout=None, chunksize=1)
- func: 각 입력 값에 적용할 함수.
- iterables: 함수에 전달할 입력 값의 반복 가능한 객체(들). 다중 반복 가능한 객체를 전달할 수 있다.
- timeout: (선택 사항) 각 작업의 최대 실행 시간을 지정한다.
- chunksize: (선택 사항) 각 작업을 제출할 때 사용할 청크 크기를 지정한다. ProcessPoolExecutor에서만 사용된다.
ThreadPoolExecutor 예제
import concurrent.futures
def compute_square(n):
return n * n
numbers = [1, 2, 3, 4, 5]
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(compute_square, numbers)
for result in results:
print(result)
ProcessPoolExecutor 예제
import concurrent.futures
def compute_square(n):
return n * n
numbers = [1, 2, 3, 4, 5]
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
results = executor.map(compute_square, numbers)
for result in results:
print(result)
- ThreadPoolExecutor 예제:
- ThreadPoolExecutor(max_workers=3)는 최대 3개의 스레드를 사용하여 작업을 병렬로 실행한다.
- executor.map(compute_square, numbers)는 compute_square 함수를 numbers 리스트의 각 요소에 대해 병렬로 실행한다.
- results는 map 메서드가 반환하는 반복자이다. 각 요소에 대해 compute_square의 결과를 순서대로 포함한다.
- for result in results:는 결과를 순서대로 출력한다.
- ProcessPoolExecutor 예제:
- ProcessPoolExecutor(max_workers=3)는 최대 3개의 프로세스를 사용하여 작업을 병렬로 실행한다.
- 나머지는 ThreadPoolExecutor 예제와 동일하게 동작한다.
map 메서드의 장점
- 단순성: submit 메서드를 여러 번 호출하고 Future 객체를 관리하는 것보다 코드가 단순해진다.
- 순서 보장: 입력 값의 순서대로 결과를 반환한다. 이는 as_completed와 달리 결과의 순서를 보장한다.
- 효율성: map 메서드는 내부적으로 작업을 효율적으로 분배하고 실행한다.
고려 사항
- 블로킹: 모든 작업이 완료될 때까지 기다리기 때문에 매우 긴 작업이 있을 경우 전체 실행 시간이 길어질 수 있다.
- 청크 크기: ProcessPoolExecutor에서 chunksize를 적절히 설정하면 많은 수의 작은 작업을 처리할 때 오버헤드를 줄일 수 있다.
Executor.map 메서드는 병렬 처리를 간단하고 효율적으로 구현할 수 있는 강력한 도구이다. 스레드나 프로세스를 통해 병렬로 작업을 실행하면서 입력 값의 순서대로 결과를 반환해야 하는 경우에 매우 유용하다.
futures.as_completed
concurrent.futures.as_completed는 비동기 작업의 결과를 작업이 완료되는 대로 반환하는 이터레이터를 제공한다. 이는 여러 비동기 작업을 병렬로 실행할 때 유용하며, 작업이 완료되는 순서에 따라 결과를 처리할 수 있게 한다.
주요 개념
- 비동기 작업 추적: as_completed는 여러 Future 객체를 받아 작업이 완료되는 대로 결과를 처리할 수 있게 한다.
- 이터레이터: as_completed는 이터레이터를 반환하므로, for 루프를 사용하여 완료된 작업의 결과를 순서대로 처리할 수 있다.
- 비동기 실행: as_completed는 비동기 작업이 완료되는 순서에 따라 결과를 반환하므로, 작업이 완료되는 즉시 결과를 처리할 수 있다.
사용법
- concurrent.futures.as_completed(fs, timeout=None)
- fs: Future 객체의 반복 가능한(iterable) 객체.
- timeout: (선택 사항) 각 작업이 완료될 때까지 기다릴 최대 시간(초).
ThreadPoolExecutor 예제
import concurrent.futures
import time
def compute_square(n):
time.sleep(n)
return n * n
numbers = [1, 2, 3, 4, 5]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(compute_square, num) for num in numbers]
for future in concurrent.futures.as_completed(futures):
print(f"Result: {future.result()}")
ProcessPoolExecutor 예제
import concurrent.futures
import time
def compute_square(n):
time.sleep(n)
return n * n
numbers = [1, 2, 3, 4, 5]
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(compute_square, num) for num in numbers]
for future in concurrent.futures.as_completed(futures):
print(f"Result: {future.result()}")
- ThreadPoolExecutor 예제:
- ThreadPoolExecutor(max_workers=5)는 최대 5개의 스레드를 사용하여 작업을 병렬로 실행한다.
- executor.submit(compute_square, num)는 각 숫자에 대해 compute_square 함수를 병렬로 실행하도록 작업을 제출한다.
- concurrent.futures.as_completed(futures)는 작업이 완료된 순서대로 Future 객체를 반환하는 이터레이터를 생성한다.
- future.result()를 호출하여 완료된 작업의 결과를 출력한다.
- ProcessPoolExecutor 예제:
- ProcessPoolExecutor(max_workers=5)는 최대 5개의 프로세스를 사용하여 작업을 병렬로 실행한다.
- 나머지는 ThreadPoolExecutor 예제와 동일하게 동작한다.
as_completed의 장점
- 즉각적인 결과 처리: 작업이 완료되는 대로 결과를 처리할 수 있어 효율적이다.
- 유연성: 작업 완료 순서에 따라 결과를 처리하므로, 실행 시간이 다른 여러 작업을 병렬로 실행할 때 유용하다.
- 타임아웃 처리: timeout 매개변수를 사용하여 각 작업이 완료될 때까지 기다릴 최대 시간을 설정할 수 있다.
고려 사항
- 작업의 완료 순서: as_completed는 작업이 완료된 순서대로 결과를 반환한다. 입력 순서와는 다를 수 있다.
- 타임아웃: 지정된 시간 내에 작업이 완료되지 않으면 concurrent.futures.TimeoutError가 발생한다.
as_completed의 작동 방식
concurrent.futures.as_completed 함수는 여러 비동기 작업을 추적하고, 작업이 완료되는 대로 결과를 반환하는 이터레이터를 제공한다. 이는 작업의 완료 순서와 상관없이, 완료된 작업부터 처리할 수 있게 해준다. 이를 통해 실행 시간이 불규칙한 작업을 병렬로 실행할 때 효율적으로 결과를 처리할 수 있다.
작동 예시
다음 예시는 concurrent.futures.as_completed를 사용하는 두 가지 예시(ThreadPoolExecutor와 ProcessPoolExecutor)를 보여준다.
ThreadPoolExecutor 예제
import concurrent.futures
import time
def compute_square(n):
time.sleep(n) # Simulate a delay based on the input number
return n * n
numbers = [5, 4, 3, 2, 1]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(compute_square, num) for num in numbers]
for future in concurrent.futures.as_completed(futures):
print(f"Result: {future.result()}")
이 예제에서는 compute_square 함수가 입력값에 따라 다른 시간 동안 대기한 후, 제곱 값을 반환한다. ThreadPoolExecutor를 사용하여 각 작업을 병렬로 실행하고, as_completed를 사용하여 작업이 완료되는 대로 결과를 출력한다.
ProcessPoolExecutor 예제
import concurrent.futures
import time
def compute_square(n):
time.sleep(n) # Simulate a delay based on the input number
return n * n
numbers = [5, 4, 3, 2, 1]
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(compute_square, num) for num in numbers]
for future in concurrent.futures.as_completed(futures):
print(f"Result: {future.result()}")
이 예제는 ProcessPoolExecutor를 사용하여 동일한 작업을 프로세스 풀에서 병렬로 실행한다. 이는 CPU 집약적인 작업에 더 적합하다.
as_completed 함수의 장점
- 작업 완료 순서에 따른 결과 처리: 입력된 순서가 아닌, 작업이 완료된 순서대로 결과를 처리할 수 있어 효율적이다.
- 즉각적인 결과 처리: 작업이 완료되면 즉시 결과를 얻을 수 있어 전체 작업 시간이 줄어들 수 있다.
- 복잡한 작업 조정: 실행 시간이 불규칙한 여러 작업을 병렬로 실행할 때 유용하다.
추가 기능
- 타임아웃 설정: as_completed 함수의 timeout 매개변수를 사용하면 작업이 완료될 때까지 기다릴 최대 시간을 설정할 수 있다. 타임아웃이 초과되면 concurrent.futures.TimeoutError 예외가 발생한다.
- 예외 처리: Future.result()를 호출할 때 작업 중에 발생한 예외가 있으면 해당 예외가 발생한다. 이를 통해 각 작업의 예외를 개별적으로 처리할 수 있다.
종합적인 예제
import concurrent.futures
import time
def compute_square(n):
if n == 3:
raise ValueError("Intentional error for number 3")
time.sleep(n)
return n * n
numbers = [5, 4, 3, 2, 1]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(compute_square, num) for num in numbers]
for future in concurrent.futures.as_completed(futures, timeout=10):
try:
result = future.result()
except concurrent.futures.TimeoutError:
print("A task timed out")
except Exception as exc:
print(f"A task generated an exception: {exc}")
else:
print(f"Result: {result}")
이 예제는 숫자 3에 대해 의도적으로 예외를 발생시키고, 각 작업의 결과를 안전하게 처리하며, 타임아웃이 발생할 경우를 처리한다.
모든 코드는 github에 저장되어 있습니다.
'Python study' 카테고리의 다른 글
property (0) | 2024.06.06 |
---|---|
asynico를 이용한 동시성 (1) | 2024.06.06 |
코루틴 (coroutine) (1) | 2024.06.04 |
콘텍스트 관리자와 else 블록 (0) | 2024.06.04 |
제너레이터 (Generator) (1) | 2024.05.29 |