본문 바로가기
CS

[Python] 동시 실행자

by 왕밤빵도라에몽 2025. 5. 18.
728x90

동시성 스크립트 비교

리스트로 동시 작업 실행

def download_many(cc_list: list[str]) -> int:
    for cc in sorted(cc_list):
        image = get_flag(cc)
        save_flag(image, f'{cc}.gif')
        print(cc, end=' ', flush=True)
    return len(cc_list)
  • 리스트 순차대로 반복하기 때문에 결과의 순서 또한 유지된다. (동시 작업 X 순차 작업 O)

concurrent.futures로 스레드로 동시작업 실행

ThreadPoolExecutor와 ProcessPoolExcutor클래스는 콜러블 객체를 서로 다른 스레드나 프로세스에서 실행하게 해 주는 API를 구현한다.

from concurrent import futures

def download_one(cc: str):
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

def download_many(cc_list: list[str]) -> int:
    with futures.ThreadPoolExecutor() as executor:
        res = executor.map(download_one, sorted(cc_list))
    return len(list(res))
  • executor.map()은 내부적으로 Future를 사용하지만 결과를 입력 순서대로 반환하기 때문에 순서가 유지된다.

Future는 concurrent.futures와 asyncio의 핵심 요소이지만 사용자는 이를 직접 볼 수 없다.
그래도 어떻게 돌아가는지 알아보자.

Future

Future는 대기 중인 작업을 큐에 넣고, 완료 상태를 조사하고, 실행된 후에는 결과를 가져올 수 있도록 지연된 작업을 캡슐화한다.
하지만 이 인스턴스를 사용자가 직접 생성면 안 된다. cocurrent.futuresasyncio같은 동시성 프레임워크를 써서 생성해야한다.

def download_many(cc_list: list[str]) -> int:
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        to_do: list[futures.Future] = []
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do.append(future)
            print(f'Scheduled for {cc}: {future}')

        for count, future in enumerate(futures.as_completed(to_do), 1):
            res: str = future.result()
            print(f'{future} result: {res!r}')

    return count
  • 결과의 순서가 보장되지 않는다
  • ThreadPoolExecutor()에서는 max_workers라는 인수로 동시에 실행할 최대 작업자 스레드 수를 설정할 수 있다.
  • executor.submit()은 콜러블이 실행되도록 스케줄링하고, 이 작업을 나타내는 Future객체를 반환한다
  • as_completed()로 가져올 수 있는 Future 객체를 모두 저장한다
  • as_completed()는 완료된 순서대로 Future를 순회할 수 있음

**concurrent.futures`로 프로세스로 동시 작업 실행

오늘 예시는 I/O작업이라 굳이 프로세싱을 여기서 쓸 필요는 없지만 암튼

from concurrent import futures

def download_one(cc: str):
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

def download_many(cc_list: list[str]) -> int:
    with futures.ProcessPoolExecutor() as executor:
        res = executor.map(download_one, sorted(cc_list))
    return len(list(res))
  • ProcessPoolExecutor()는 여러 프로세스를 생성해서 각 작업을 병렬로 처리
  • executor.map()은 입력된 순서대로 결과를 반환한다
  • download_one()은 각 프로세스에서 실행되며, 독립된 메모리 공간을 사용

예외나 실패한 Future 처리를 하고 싶으면 as_completed() + future.result() 방식이 더 유연함

from concurrent import futures

def download_one(cc: str):
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

def download_many(cc_list: list[str]) -> int:
    cc_list = cc_list[:5]

    with futures.ProcessPoolExecutor(max_workers=3) as executor:
        to_do: list[futures.Future] = []
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do.append(future)
            print(f'Scheduled for {cc}: {future}')

        results = []
        for future in futures.as_completed(to_do):
            try:
                res: str = future.result()
            except Exception as e:
                print(f'Error: {e}')
            else:
                print(f'{future} result: {res!r}')
                results.append(res)

    return len(results)
  • executor.submit(): 각 국가 코드에 대해 하나의 프로세스를 생성해 download_one 실행
  • to_do: 생성된 Future 객체들을 저장
  • futures.as_completed(): 완료된 순서대로 Future를 순회함
  • future.result(): 실제 결과 값을 가져옴, 실패하면 예외 발생

References

-『 전문가를 위한 파이썬(2판) 』, 루시아누 하말류, 한빛미디어, Part 1 - Chapter 20

728x90