日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

python入門系列:多進(jìn)程

 xiaoyimin 2019-02-14

多進(jìn)程和多線程的區(qū)別

  • Python多線程的操作,由于有GIL鎖的存在,使得其運(yùn)行效率并不會很高,無法充分利用 多核cpu 的優(yōu)勢,只有在I/O密集形的任務(wù)邏輯中才能實(shí)現(xiàn)并發(fā)。
  • 使用多進(jìn)程來編寫同樣消耗cpu(一般是計算)的邏輯,對于 多核cpu 來說效率會好很多。
  • 操作系統(tǒng)對進(jìn)程的調(diào)度代價要比線程調(diào)度要大的多。

多線程和多進(jìn)程使用案例對比

1.用多進(jìn)程多線程兩種方式來運(yùn)算 斐波那契數(shù)列,這里都依賴 concurrent.futures 模塊提供的線/進(jìn)程池。

import time

from concurrent.futures import ThreadPoolExecutor

from concurrent.futures import ProcessPoolExecutor

from concurrent.futures import as_completed

def fib(n):

return 1 if n <= 2 else fib(n-1) + fib(n-2)

if __name__ == '__main__':

# with ProcessPoolExecutor(3) as executor:

with ThreadPoolExecutor(3) as executor:

all_task = [executor.submit(fib, n) for n in range(25, 35)]

start_time = time.time

for future in as_completed(all_task):

data = future.result

# todo

end_time = time.time

print('time consuming by threads: {0}s'.format(end_time-start_time))

# print('time consuming by processes: {0}s'.format(end_time-start_time))

兩種方式的運(yùn)行結(jié)果對比:

# result:

# time consuming by threads: 4.823292016983032s

# time consuming by processes: 3.3890748023986816s

可以看到,對于高計算量的任務(wù),多進(jìn)程要比多線程更加高效。同時,從這個例子中還能看出,通過concurrent.futures模塊使用線程池進(jìn)程池的方式的接口和使用邏輯是一樣的,不過在使用多進(jìn)程時,對于Windows的操作平臺,相關(guān)邏輯一定要放在main中,Linux不受約束。

2.用多進(jìn)程多線程兩種方式來模擬 I/O密集操作,I/O操作 的特點(diǎn)就是 cpu 要耗費(fèi)大量的時間進(jìn)行等待數(shù)據(jù),這里用sleep進(jìn)行模擬即可。

整體的操作方式不變,修改過的邏輯如下:

def random_sleep(n):

time.sleep(n)

return n

...

# 8 個線程,每個休眠兩秒,模擬 I/O

with ProcessPoolExecutor(8) as executor:

# with ThreadPoolExecutor(8) as executor:

all_task = [executor.submit(random_sleep, 2) for i in range(30)]

# result:

# time consuming by threads: 8.002903699874878s

# time consuming by processes: 8.34946894645691s

多進(jìn)程編程

直接使用

import time

import multiprocessing

def read(times):

time.sleep(times)

print('process reading...')

return 'read for {0}s'.format(times)

def write(times):

time.sleep(times)

print('process writing...')

return 'write for {0}s'.format(times)

if __name__ == '__main__':

read_process = multiprocessing.Process(target=read, args=(1,))

write_process = multiprocessing.Process(target=write, args=(2,))

read_process.start

write_process.start

print('read_process id {rid}'.format(rid=read_process.pid))

print('write_process id {wid}'.format(wid=write_process.pid))

read_process.join

write_process.join

print('done')

# result:

# read_process id 7064

# write_process id 836

# process reading...

# process writing...

# done

可以看出,關(guān)于多線程的邏輯和多線程的使用方式以類似的,要注意在Windows操作系統(tǒng)上,和進(jìn)程有關(guān)的邏輯要寫在if __name__ == '__main__'中。其他的一些方法請參閱 官方文檔。

使用原生進(jìn)程池

import time

import multiprocessing

def read(times):

time.sleep(times)

print('process reading...')

return 'read for {0}s'.format(times)

def write(times):

time.sleep(times)

print('process writing...')

return 'write for {0}s'.format(times)

if __name__ == '__main__':

# multiprocessing.cpu_count 獲取cpu的核心數(shù)

pool = multiprocessing.Pool(multiprocessing.cpu_count)

read_result = pool.apply_async(read, args=(2,))

write_result = pool.apply_async(write, args=(3,))

# 關(guān)閉進(jìn)程池,不再接受新的任務(wù)提交,否則 join 出錯

pool.close

# 等待進(jìn)程池中提交的所有任務(wù)完成

pool.join

print(read_result.get)

print(write_result.get)

# result:

# process reading...

# process writing...

# read for 2s

# write for 3s

使用imap,所有任務(wù)順序執(zhí)行:

pool = multiprocessing.Pool(multiprocessing.cpu_count)

for result in pool.imap(read, [2, 1, 3]):

print(result)

# result:

# process reading...

# process reading...

# read for 2s

# read for 1s

# process reading...

# read for 3s

使用imap_unordered,哪個任務(wù)先完成就先返回結(jié)果:

for result in pool.imap_unordered(read, [1, 5, 3]):

print(result)

# process reading...

# read for 1s

# process reading...

# read for 3s

# process reading...

# read for 5s

使用concurrent.futures中的ProcessPoolExecutor

這個在多線程和多進(jìn)程對比的時提到過,因?yàn)楹投嗑€程的使用方式一樣,這里就不多贅述,可以參閱 官方文檔 給出的例子

進(jìn)程間通信

進(jìn)程通信和線程通信有些區(qū)別,在線程通信中各種提供的鎖的機(jī)制全局變量在這里不再適用,我們要選取新的工具來完成進(jìn)程通信任務(wù)。

使用multiprocessing.Queue

使用邏輯是和多線程中的Queue是一樣的,詳細(xì)方法。這種通信方式不能用在通過Pool進(jìn)程池創(chuàng)建的進(jìn)程

import multiprocessing

import time

def plus(queue):

for i in range(6):

num = queue.get + 1

queue.put(num)

print(num)

time.sleep(1)

def subtract(queue):

for i in range(6):

num = queue.get - 1

queue.put(num)

print(num)

time.sleep(2)

if __name__ == '__main__':

queue = multiprocessing.Queue(1)

queue.put(0)

plus_process = multiprocessing.Process(target=plus, args=(queue,))

subtract_process = multiprocessing.Process(target=subtract, args=(queue,))

plus_process.start

subtract_process.start

# result:

# 1

# 1

# 2

# 2

# 3

# 3

# 0

# 1

# 2

# 2

# 1

# 0

使用Manager中的Queue

Manager會返回一個在進(jìn)程間進(jìn)行同步管理的一個對象,它提供了多種在進(jìn)程間共享數(shù)據(jù)的形式。

import multiprocessing

import time

def plus(queue):

for i in range(6):

num = queue.get + 1

queue.put(num)

print(num)

time.sleep(1)

def subtract(queue):

for i in range(6):

num = queue.get - 1

queue.put(num)

print(num)

time.sleep(2)

if __name__ == '__main__':

queue = multiprocessing.Manager.Queue(1) # 創(chuàng)建方式有些奇特

# queue = multiprocessing.Queue # 這時用這個就行不通了

pool = multiprocessing.Pool(2)

queue.put(0)

pool.apply_async(plus, args=(queue,))

pool.apply_async(subtract, args=(queue,))

pool.close

pool.join

# result:

# 0

# 1

# 1

# 2

# 2

# 3

# -1

# 0

# 1

# 2

# 1

# 0

使用Manager中的list

多個進(jìn)程可以共享全局的list,因?yàn)槭沁M(jìn)程間共享,所以用鎖的機(jī)制保證它的安全性。這里的Manager.Lock不是前面線程級別的Lock,它可以保證進(jìn)程間的同步。

import multiprocessing as mp

import time

def add_person(waiting_list, name_list, lock):

lock.acquire

for name in name_list:

waiting_list.append(name)

time.sleep(1)

print(waiting_list)

lock.release

def get_person(waiting_list, lock):

lock.acquire

if waiting_list:

name = waiting_list.pop(0)

print('get {0}'.format(name))

lock.release

if __name__ == '__main__':

waiting_list = mp.Manager.list

lock = mp.Manager.Lock # 使用 lock 限制進(jìn)程對全局量的訪問

name_list = ['MetaTian', 'Rity', 'Anonymous']

add_process = mp.Process(target=add_person, args=(waiting_list, name_list, lock))

get_process = mp.Process(target=get_person, args=(waiting_list, lock))

add_process.start

get_process.start

add_process.join

get_process.join

print(waiting_list)

# result:

# ['MetaTian']

# ['MetaTian', 'Rity']

# ['MetaTian', 'Rity', 'Anonymous']

# get MetaTian

# ['Rity', 'Anonymous']

Manager中還有更多的進(jìn)程間通信的工具,可以參閱官方文檔。

使用Pipe

Pipe只能適用于兩個進(jìn)程間的通信,它的性能高于Queue,Pipe會返回兩個Connection對象,使用這個對象可以在進(jìn)程間進(jìn)行數(shù)據(jù)的發(fā)送和接收,非常像前面講過的socket對象。關(guān)于Connection

import multiprocessing

def plus(conn):

default_num = 0

for i in range(3):

num = 0 if i == 0 else conn.recv

conn.send(num + 1)

print('plus send: {0}'.format(num+1))

def subtract(conn):

for i in range(3):

num = conn.recv

conn.send(num-1)

print('subtract send: {0}'.format(num-1))

if __name__ == '__main__':

conn_plus, conn_sbtract = multiprocessing.Pipe

plus_process = multiprocessing.Process(target=plus, args=(conn_plus,))

subtract_process = multiprocessing.Process(target=subtract, args=(conn_sbtract,))

plus_process.start

subtract_process.start

# result:

# plus send: 1

# subtract send: 0

# plus send: 1

# subtract send: 0

# plus send: 1

# subtract send: 0

send可以連續(xù)發(fā)送數(shù)據(jù),recv將另一端發(fā)送的數(shù)據(jù)陸續(xù)取出,如果沒有取到數(shù)據(jù),則進(jìn)入等待狀態(tài)。

注:喜歡python + qun:839383765 可以獲取Python各類免費(fèi)最新入門學(xué)習(xí)資料!

(完)

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多