본문 바로가기
Python/파이썬 문법, 함수, 모듈 등

[Python] 2. Multiprocessing(멀티 프로세싱) 결과를 정렬하여 받는 방법, 공유할 데이터가 있을 때의 방법 multiprocessing.Manager()

by dong_su 2024. 5. 28.

결과를 정렬하여 받는 방법 

프로세스들이 병렬로 실행되기 때문에,결과값의 순서가 보장되지 않으므로, 정렬을 통해 순서를 맞추어 출력하면 된다.

실제 작업은 병렬로 작업하여 순서가 맞지 않지만, 결과만 순서대로 받고 작업이 전부 끝날 때 한번에 받는다.

 

예시)

import multiprocessing

def update_data(index, url):
    # 데이터 처리 로직
    return f"{index}\t{url}"

if __name__ == "__main__":
    # 데이터 준비
    index_list = [1, 2, 3, 4]
    url_list = ["url1", "url2", "url3", "url4"]

    # 멀티프로세싱 Pool 객체 생성
    with multiprocessing.Pool(processes=12) as pool:
        # 병렬로 작업을 수행하고 결과를 리스트로 반환
        res = pool.starmap(update_data, zip(index_list, url_list))
    
    # 결과 정렬
    data_dict = {}
    for string in res:
        parts = string.strip().split("\t")
        index = int(parts[0])
        data_dict[index] = parts[1:]
    
    # 딕셔너리를 정렬하여 순서대로 출력
    sorted_list = dict(sorted(data_dict.items()))

    for key, value in sorted_list.items():
        value = "\t".join(value)
        print(f"{key}\t{value}")

공유할 데이터가 있을 때 방법

멀티프로세싱을 사용하면 각 프로세스가 별도의 메모리 공간을 가지므로 데이터 공유에 대한 문제가 발생

이를 해결하기 위해 multiprocessing.Manager()를 사용하여 공유 메모리에 접근하고 관리

 

예시)

if processes is not None:
    print("=" * 48)
    print(f"processes 개수 : {processes}")
    print("=" * 48)

    # 멀티프로세싱을 위해 프로세스 개수 설정
    mp_cnt = min(processes, total_cnt)

    # 프로세스별로 데이터를 분할하여 리스트에 저장
    mp_data_lst = [[] for _ in range(mp_cnt)]
    for idx, data in enumerate(input_lst):
        mp_data_lst[idx % mp_cnt].append(f'{idx}\t{data}')

    # 공유 메모리 객체 생성
    manager = multiprocessing.Manager()
    # 공유 리스트 생성
    partner_lst = manager.list(partner_lst)

    # 각 프로세스의 진행 상태를 기록하는 리스트 생성
    process_done = manager.list()
    # 결과를 저장할 공유 딕셔너리 생성
    output_dic = manager.dict()

    processes_list = []
    # 각 프로세스에 대해 작업 시작
    for data in mp_data_lst:
        p = multiprocessing.Process(target=proc_func, args=(data, partner_lst, process_done, output_dic, total_cnt))
        p.start()
        processes_list.append(p)

    # 모든 프로세스가 종료될 때까지 대기
    for p in processes_list:
        p.join()

    # 프로세스 객체 타입을 딕셔너리로 변환
    output_dic = dict(output_dic)

    # 결과를 정렬하여 파일에 기록
    sorted_output_dic = sorted(output_dic.keys())
    for key in sorted_output_dic:
        result = output_dic[key]
        with open(파일, "a") as f :
            f.write(result)

-> proc_func 이라는 함수에 args로 받은 데이터를 처리하고 처리 결과는 output_dic의 key에는 인덱스, value에는 결과를 저장