多進(jìn)程和多線程的區(qū)別
多線程和多進(jìn)程使用案例對比1.用多進(jìn)程和多線程兩種方式來運(yùn)算 斐波那契數(shù)列,這里都依賴 concurrent.futures 模塊提供的線/進(jìn)程池。 import time from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ProcessPoolExecutor from concurrent.futures import as_completed def fib(n): return 1 if n <= 2 else fib(n-1) + fib(n-2) if __name__ == '__main__': # with ProcessPoolExecutor(3) as executor: with ThreadPoolExecutor(3) as executor: all_task = [executor.submit(fib, n) for n in range(25, 35)] start_time = time.time for future in as_completed(all_task): data = future.result # todo
end_time = time.time
print('time consuming by threads: {0}s'.format(end_time-start_time)) # print('time consuming by processes: {0}s'.format(end_time-start_time)) 兩種方式的運(yùn)行結(jié)果對比: # result: # time consuming by threads: 4.823292016983032s # time consuming by processes: 3.3890748023986816s 可以看到,對于高計算量的任務(wù),多進(jìn)程要比多線程更加高效。同時,從這個例子中還能看出,通過concurrent.futures模塊使用線程池和進(jìn)程池的方式的接口和使用邏輯是一樣的,不過在使用多進(jìn)程時,對于Windows的操作平臺,相關(guān)邏輯一定要放在main中,Linux不受約束。 2.用多進(jìn)程和多線程兩種方式來模擬 I/O密集操作,I/O操作 的特點(diǎn)就是 cpu 要耗費(fèi)大量的時間進(jìn)行等待數(shù)據(jù),這里用sleep進(jìn)行模擬即可。 整體的操作方式不變,修改過的邏輯如下: def random_sleep(n): time.sleep(n) return n
... # 8 個線程,每個休眠兩秒,模擬 I/O with ProcessPoolExecutor(8) as executor: # with ThreadPoolExecutor(8) as executor: all_task = [executor.submit(random_sleep, 2) for i in range(30)] # result: # time consuming by threads: 8.002903699874878s # time consuming by processes: 8.34946894645691s 多進(jìn)程編程直接使用 import time import multiprocessing def read(times): time.sleep(times) print('process reading...') return 'read for {0}s'.format(times) def write(times): time.sleep(times) print('process writing...') return 'write for {0}s'.format(times) if __name__ == '__main__': read_process = multiprocessing.Process(target=read, args=(1,)) write_process = multiprocessing.Process(target=write, args=(2,)) read_process.start write_process.start print('read_process id {rid}'.format(rid=read_process.pid)) print('write_process id {wid}'.format(wid=write_process.pid)) read_process.join write_process.join print('done')
# result: # read_process id 7064 # write_process id 836 # process reading... # process writing... # done 可以看出,關(guān)于多線程的邏輯和多線程的使用方式以類似的,要注意在Windows操作系統(tǒng)上,和進(jìn)程有關(guān)的邏輯要寫在if __name__ == '__main__'中。其他的一些方法請參閱 官方文檔。 使用原生進(jìn)程池 import time import multiprocessing def read(times): time.sleep(times) print('process reading...') return 'read for {0}s'.format(times) def write(times): time.sleep(times) print('process writing...') return 'write for {0}s'.format(times) if __name__ == '__main__': # multiprocessing.cpu_count 獲取cpu的核心數(shù) pool = multiprocessing.Pool(multiprocessing.cpu_count)
read_result = pool.apply_async(read, args=(2,)) write_result = pool.apply_async(write, args=(3,)) # 關(guān)閉進(jìn)程池,不再接受新的任務(wù)提交,否則 join 出錯 pool.close # 等待進(jìn)程池中提交的所有任務(wù)完成 pool.join print(read_result.get) print(write_result.get) # result: # process reading... # process writing... # read for 2s # write for 3s 使用imap,所有任務(wù)順序執(zhí)行: pool = multiprocessing.Pool(multiprocessing.cpu_count)
for result in pool.imap(read, [2, 1, 3]): print(result)
# result: # process reading... # process reading... # read for 2s # read for 1s # process reading... # read for 3s 使用imap_unordered,哪個任務(wù)先完成就先返回結(jié)果: for result in pool.imap_unordered(read, [1, 5, 3]): print(result) # process reading... # read for 1s # process reading... # read for 3s # process reading... # read for 5s 使用concurrent.futures中的ProcessPoolExecutor 這個在多線程和多進(jìn)程對比的時提到過,因?yàn)楹投嗑€程的使用方式一樣,這里就不多贅述,可以參閱 官方文檔 給出的例子 進(jìn)程間通信進(jìn)程通信和線程通信有些區(qū)別,在線程通信中各種提供的鎖的機(jī)制和全局變量在這里不再適用,我們要選取新的工具來完成進(jìn)程通信任務(wù)。 使用multiprocessing.Queue 使用邏輯是和多線程中的Queue是一樣的,詳細(xì)方法。這種通信方式不能用在通過Pool進(jìn)程池創(chuàng)建的進(jìn)程中 import multiprocessing import time def plus(queue): for i in range(6): num = queue.get + 1 queue.put(num) print(num) time.sleep(1) def subtract(queue): for i in range(6): num = queue.get - 1 queue.put(num) print(num) time.sleep(2) if __name__ == '__main__': queue = multiprocessing.Queue(1) queue.put(0) plus_process = multiprocessing.Process(target=plus, args=(queue,)) subtract_process = multiprocessing.Process(target=subtract, args=(queue,)) plus_process.start subtract_process.start
# result: # 1 # 1 # 2 # 2 # 3 # 3 # 0 # 1 # 2 # 2 # 1 # 0 使用Manager中的Queue Manager會返回一個在進(jìn)程間進(jìn)行同步管理的一個對象,它提供了多種在進(jìn)程間共享數(shù)據(jù)的形式。 import multiprocessing import time def plus(queue): for i in range(6): num = queue.get + 1 queue.put(num) print(num) time.sleep(1) def subtract(queue): for i in range(6): num = queue.get - 1 queue.put(num) print(num) time.sleep(2) if __name__ == '__main__': queue = multiprocessing.Manager.Queue(1) # 創(chuàng)建方式有些奇特 # queue = multiprocessing.Queue # 這時用這個就行不通了 pool = multiprocessing.Pool(2) queue.put(0) pool.apply_async(plus, args=(queue,)) pool.apply_async(subtract, args=(queue,)) pool.close pool.join
# result: # 0 # 1 # 1 # 2 # 2 # 3 # -1 # 0 # 1 # 2 # 1 # 0 使用Manager中的list 多個進(jìn)程可以共享全局的list,因?yàn)槭沁M(jìn)程間共享,所以用鎖的機(jī)制保證它的安全性。這里的Manager.Lock不是前面線程級別的Lock,它可以保證進(jìn)程間的同步。 import multiprocessing as mp import time def add_person(waiting_list, name_list, lock): lock.acquire for name in name_list: waiting_list.append(name) time.sleep(1) print(waiting_list) lock.release def get_person(waiting_list, lock): lock.acquire if waiting_list: name = waiting_list.pop(0) print('get {0}'.format(name)) lock.release if __name__ == '__main__': waiting_list = mp.Manager.list lock = mp.Manager.Lock # 使用 lock 限制進(jìn)程對全局量的訪問 name_list = ['MetaTian', 'Rity', 'Anonymous'] add_process = mp.Process(target=add_person, args=(waiting_list, name_list, lock)) get_process = mp.Process(target=get_person, args=(waiting_list, lock)) add_process.start get_process.start add_process.join get_process.join print(waiting_list)
# result: # ['MetaTian'] # ['MetaTian', 'Rity'] # ['MetaTian', 'Rity', 'Anonymous'] # get MetaTian # ['Rity', 'Anonymous'] Manager中還有更多的進(jìn)程間通信的工具,可以參閱官方文檔。 使用Pipe Pipe只能適用于兩個進(jìn)程間的通信,它的性能高于Queue,Pipe會返回兩個Connection對象,使用這個對象可以在進(jìn)程間進(jìn)行數(shù)據(jù)的發(fā)送和接收,非常像前面講過的socket對象。關(guān)于Connection import multiprocessing def plus(conn): default_num = 0 for i in range(3): num = 0 if i == 0 else conn.recv conn.send(num + 1) print('plus send: {0}'.format(num+1)) def subtract(conn): for i in range(3): num = conn.recv conn.send(num-1) print('subtract send: {0}'.format(num-1)) if __name__ == '__main__': conn_plus, conn_sbtract = multiprocessing.Pipe plus_process = multiprocessing.Process(target=plus, args=(conn_plus,)) subtract_process = multiprocessing.Process(target=subtract, args=(conn_sbtract,)) plus_process.start subtract_process.start # result: # plus send: 1 # subtract send: 0 # plus send: 1 # subtract send: 0 # plus send: 1 # subtract send: 0 send可以連續(xù)發(fā)送數(shù)據(jù),recv將另一端發(fā)送的數(shù)據(jù)陸續(xù)取出,如果沒有取到數(shù)據(jù),則進(jìn)入等待狀態(tài)。 注:喜歡python + qun:839383765 可以獲取Python各類免費(fèi)最新入門學(xué)習(xí)資料! (完) |
|