Python/파이썬 문법, 함수, 모듈 등

[Python] 2. Multiprocessing(멀티 프로세싱) 결과를 정렬하여 받는 방법, 공유할 데이터가 있을 때의 방법 multiprocessing.Manager()

dong_su 2024. 5. 28. 17:12

결과를 정렬하여 받는 방법 

프로세스들이 병렬로 실행되기 때문에,결과값의 순서가 보장되지 않으므로, 정렬을 통해 순서를 맞추어 출력하면 된다.

실제 작업은 병렬로 작업하여 순서가 맞지 않지만, 결과만 순서대로 받고 작업이 전부 끝날 때 한번에 받는다.

 

예시)

import multiprocessing

def update_data(index, url):
    # 데이터 처리 로직
    return f"{index}\t{url}"

if __name__ == "__main__":
    # 데이터 준비
    index_list = [1, 2, 3, 4]
    url_list = ["url1", "url2", "url3", "url4"]

    # 멀티프로세싱 Pool 객체 생성
    with multiprocessing.Pool(processes=12) as pool:
        # 병렬로 작업을 수행하고 결과를 리스트로 반환
        res = pool.starmap(update_data, zip(index_list, url_list))
    
    # 결과 정렬
    data_dict = {}
    for string in res:
        parts = string.strip().split("\t")
        index = int(parts[0])
        data_dict[index] = parts[1:]
    
    # 딕셔너리를 정렬하여 순서대로 출력
    sorted_list = dict(sorted(data_dict.items()))

    for key, value in sorted_list.items():
        value = "\t".join(value)
        print(f"{key}\t{value}")

공유할 데이터가 있을 때 방법

멀티프로세싱을 사용하면 각 프로세스가 별도의 메모리 공간을 가지므로 데이터 공유에 대한 문제가 발생

이를 해결하기 위해 multiprocessing.Manager()를 사용하여 공유 메모리에 접근하고 관리

 

예시)

if processes is not None:
    print("=" * 48)
    print(f"processes 개수 : {processes}")
    print("=" * 48)

    # 멀티프로세싱을 위해 프로세스 개수 설정
    mp_cnt = min(processes, total_cnt)

    # 프로세스별로 데이터를 분할하여 리스트에 저장
    mp_data_lst = [[] for _ in range(mp_cnt)]
    for idx, data in enumerate(input_lst):
        mp_data_lst[idx % mp_cnt].append(f'{idx}\t{data}')

    # 공유 메모리 객체 생성
    manager = multiprocessing.Manager()
    # 공유 리스트 생성
    partner_lst = manager.list(partner_lst)

    # 각 프로세스의 진행 상태를 기록하는 리스트 생성
    process_done = manager.list()
    # 결과를 저장할 공유 딕셔너리 생성
    output_dic = manager.dict()

    processes_list = []
    # 각 프로세스에 대해 작업 시작
    for data in mp_data_lst:
        p = multiprocessing.Process(target=proc_func, args=(data, partner_lst, process_done, output_dic, total_cnt))
        p.start()
        processes_list.append(p)

    # 모든 프로세스가 종료될 때까지 대기
    for p in processes_list:
        p.join()

    # 프로세스 객체 타입을 딕셔너리로 변환
    output_dic = dict(output_dic)

    # 결과를 정렬하여 파일에 기록
    sorted_output_dic = sorted(output_dic.keys())
    for key in sorted_output_dic:
        result = output_dic[key]
        with open(파일, "a") as f :
            f.write(result)

-> proc_func 이라는 함수에 args로 받은 데이터를 처리하고 처리 결과는 output_dic의 key에는 인덱스, value에는 결과를 저장