日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

Python:多線程編程

 昵稱61973759 2019-02-02

1.IO編程

IO(input/output)。凡是用到數(shù)據(jù)交換的地方,都會(huì)涉及io編程,例如磁盤,網(wǎng)絡(luò)的數(shù)據(jù)傳輸。在IO編程中,stream(流)是一種重要的概念,分為輸入流(input stream)和輸出流(output stream)??梢园蚜骷竟?jié)為一個(gè)水管,數(shù)據(jù)相當(dāng)于水管中的水,但是只能單向流動(dòng),所以數(shù)據(jù)傳輸過程中需要假設(shè)兩個(gè)水管,一個(gè)負(fù)責(zé)輸入,一個(gè)負(fù)責(zé)輸出,這樣讀寫就可以實(shí)現(xiàn)同步。

2.GIL:global interpreter lock全局解釋器鎖

在Python中,GIL導(dǎo)致同一時(shí)刻只有一個(gè)線程進(jìn)入解釋器。

計(jì)算機(jī)有些操作不依賴于CPU,I/O操作幾乎不利用,IO密集型不占用CPU,多線程可完成

計(jì)算操作都要用到CPU,適合多進(jìn)程

  • IO密集型任務(wù)或函數(shù),可以用多線程
  • 計(jì)算密集型任務(wù)函數(shù),sorry,改C
  • 計(jì)算密集型程序適合C語言多線程,I/O密集型適合腳本語言開發(fā)的多線程

線程和進(jìn)程的區(qū)別:

  • 線程共享創(chuàng)建它的進(jìn)程的地址空間; 進(jìn)程有自己的地址空間。
  • 線程可以直接訪問其進(jìn)程的數(shù)據(jù)段; 進(jìn)程有自己的父進(jìn)程的數(shù)據(jù)段副本。
  • 線程可以直接與其進(jìn)程的其他線程進(jìn)行通信; 進(jìn)程必須使用進(jìn)程間通信來與兄弟進(jìn)程進(jìn)行通信。
  • 新線程很容易創(chuàng)建; 新進(jìn)程需要父進(jìn)程的重復(fù)。
  • 線程可以對(duì)相同進(jìn)程的線程進(jìn)行相當(dāng)?shù)目刂?
  • 流程只能控制子進(jìn)程。
  • 對(duì)主線程的更改(取消,優(yōu)先級(jí)更改等)可能會(huì)影響進(jìn)程的其他線程的行為; 對(duì)父進(jìn)程的更改不會(huì)影響子進(jìn)程。

進(jìn)程優(yōu)點(diǎn):同時(shí)利用多個(gè)CPU,同時(shí)進(jìn)行多個(gè)操作;缺點(diǎn):耗費(fèi)資源(需要開辟多個(gè)內(nèi)存空間)

線程優(yōu)點(diǎn):共享多個(gè)資源,IO操作,創(chuàng)造并發(fā)操作;缺點(diǎn):搶占資源

3.threading模塊

threading模塊對(duì)象
復(fù)制代碼
Thread    表示一個(gè)線程的執(zhí)行的對(duì)象
Lock    鎖原語對(duì)象(跟thread模塊里的鎖對(duì)象相同)
RLock    可重入鎖對(duì)象。使單線程可以再次獲得已經(jīng)獲得了的鎖(遞歸鎖定)
Condition條件變量對(duì)象能讓一個(gè)線程停下來,等待其他線程滿足了某個(gè)“條件”。如狀態(tài)的改變或值的改變
Event    通用的條件變量。多個(gè)線程可以等待某個(gè)時(shí)間的發(fā)生,在事件發(fā)生后,所有的線程都被激活
Semaphore    為等待鎖的線程提供一個(gè)類似“等候室”的結(jié)構(gòu)
BoundedSemaphore    與Semaphore類似,只是它不允許超過初始值
Timer    與thread類似,只是它要等待一段時(shí)間后才開始運(yùn)行
Barrier    創(chuàng)建一個(gè)障礙,必須達(dá)到指定數(shù)量的線程后才可以繼續(xù)
復(fù)制代碼

3.1線程的兩種調(diào)用方式

按 Ctrl+C 復(fù)制代碼
按 Ctrl+C 復(fù)制代碼
復(fù)制代碼
#繼承式調(diào)用
import threading
import time
class MyThread(threading.Thread):
    def __init__(self, num):
        threading.Thread.__init__(self)
        self.num = num
        
    def run(self):  #定義每個(gè)線程要運(yùn)行的函數(shù)
        print("running on number:%s" % self.num)
        time.sleep(4)

if __name__ == '__main__':
    t1 = MyThread(111)
    t2 = MyThread(222)
    t1.start()
    t2.start()
復(fù)制代碼

threading的Thread類主要的運(yùn)行對(duì)象

復(fù)制代碼
函數(shù)
start()                開始線程的執(zhí)行
run()                定義線程的功能的函數(shù)(一般會(huì)被子類重寫)
join(timeout=None)    程序掛起,直到線程結(jié)束;如果給了timeout,則最多阻塞timeout秒
getName()            返回線程的名字
setName(name)        設(shè)置線程的名字
isAlive()            布爾標(biāo)志,表示這個(gè)線程是否還在運(yùn)行中
isDaemon()            返回線程的daemon標(biāo)志
setDaemon(daemonic)    把線程的daemon標(biāo)志設(shè)為daemonic(一定要在調(diào)用start()函數(shù)前調(diào)用)
threading.currentThread(): 返回當(dāng)前的線程變量。
threading.enumerate(): 返回一個(gè)包含正在運(yùn)行的線程的list。正在運(yùn)行指線程啟動(dòng)后、結(jié)束前,不包括啟動(dòng)前和終止后的線程。
復(fù)制代碼

3.2 join and setDaemon

按 Ctrl+C 復(fù)制代碼
按 Ctrl+C 復(fù)制代碼

join()會(huì)等到線程結(jié)束,或者在給了timeout參數(shù)的時(shí)候,等到超時(shí)為止。使用join()比使用一個(gè)等待鎖釋放的無限循環(huán)清楚一些(也稱“自旋鎖”)。

join()的另一個(gè)比較重要的方法是它可以完全不用調(diào)用。一旦線程啟動(dòng)后,就會(huì)一直運(yùn)行,直到線程的函數(shù)結(jié)束,退出為止。
如果你的主線程除了等線程結(jié)束外,還有其他的事情要做(如處理或等待其他的客戶請(qǐng)求),那就不用調(diào)用join(),只有在你要等待線程結(jié)束的時(shí)候才要調(diào)用join()。

setDaemon(True):將線程聲明為守護(hù)線程,必須在start() 方法調(diào)用之前設(shè)置, 如果不設(shè)置為守護(hù)線程程序會(huì)被無限掛起。這個(gè)方法基本和join是相反的。當(dāng)我們 在程序運(yùn)行中,執(zhí)行一個(gè)主線程,如果主線程又創(chuàng)建一個(gè)子線程,主線程和子線程 就分兵兩路,分別運(yùn)行,那么當(dāng)主線程完成想退出時(shí),會(huì)檢驗(yàn)子線程是否完成。如 果子線程未完成,則主線程會(huì)等待子線程完成后再退出。但是有時(shí)候我們需要的是 只要主線程完成了,不管子線程是否完成,都要和主線程一起退出,這時(shí)就可以 用setDaemon方法

3.3 同步鎖Lock

由于線程之間是進(jìn)行隨機(jī)調(diào)度,并且每個(gè)線程可能只執(zhí)行n條執(zhí)行之后,當(dāng)多個(gè)線程同時(shí)修改同一條數(shù)據(jù)時(shí)可能會(huì)出現(xiàn)臟數(shù)據(jù),所以,出現(xiàn)了線程鎖 - 同一時(shí)刻允許一個(gè)線程執(zhí)行操作。

復(fù)制代碼
import time
import threading
def addNum():
    global num #在每個(gè)線程中都獲取這個(gè)全局變量
    num-=1
    temp=num
    print('--get num:',num )
    print('看下結(jié)果是0')
    print('再看下結(jié)果')
    time.sleep(0.001)
    num =temp-1 #對(duì)此公共變量進(jìn)行-1操作

num = 100  #設(shè)定一個(gè)共享變量
thread_list = []
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有線程執(zhí)行完畢
    t.join()

print('final num:', num )
復(fù)制代碼

注意:

1:  why num-=1沒問題呢?這是因?yàn)閯?dòng)作太快(完成這個(gè)動(dòng)作在切換的時(shí)間內(nèi))

2: if sleep(1),現(xiàn)象會(huì)更明顯,100個(gè)線程每一個(gè)一定都沒有執(zhí)行完就進(jìn)行了切換,我們說過sleep就等效于IO阻塞,1s之內(nèi)不會(huì)再切換回來,所以最后的結(jié)果一定是99.

多個(gè)線程都在同時(shí)操作同一個(gè)共享資源,所以造成了資源破壞,怎么辦呢?

有同學(xué)會(huì)想用join唄,但join會(huì)把整個(gè)線程給停住,造成了串行,失去了多線程的意義,而我們只需要把計(jì)算(涉及到操作公共數(shù)據(jù))的時(shí)候串行執(zhí)行。

我們可以通過同步鎖來解決這種問題

復(fù)制代碼
import time
import threading
begin=time.time()
def addNum():
    global num #在每個(gè)線程中都獲取這個(gè)全局變量
    # num-=1
    lock.acquire()
    temp=num
    # print('--get num:',num )
    time.sleep(0.0001)
    num =temp-1 #對(duì)此公共變量進(jìn)行-1操作
    lock.release()
num = 100  #設(shè)定一個(gè)共享變量
thread_list = []
lock=threading.Lock()
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
for t in thread_list: #等待所有線程執(zhí)行完畢
    t.join()
end=time.time()
print('final num:', num )
print(end-begin)
復(fù)制代碼

同步鎖與GIL的關(guān)系?

Python的線程在GIL的控制之下,線程之間,對(duì)整個(gè)python解釋器,對(duì)python提供的C API的訪問都是互斥的,這可以看作是Python內(nèi)核級(jí)的互斥機(jī)制。但是這種互斥是我們不能控制的,我們還需要另外一種可控的互斥機(jī)制———用戶級(jí)互斥。內(nèi)核級(jí)通過互斥保護(hù)了內(nèi)核的共享資源,同樣,用戶級(jí)互斥保護(hù)了用戶程序中的共享資源。

GIL 的作用是:對(duì)于一個(gè)解釋器,只能有一個(gè)thread在執(zhí)行bytecode。所以每時(shí)每刻只有一條bytecode在被執(zhí)行一個(gè)thread。GIL保證了bytecode 這層面上是thread safe的。
但是如果你有個(gè)操作比如 x += 1,這個(gè)操作需要多個(gè)bytecodes操作,在執(zhí)行這個(gè)操作的多條bytecodes期間的時(shí)候可能中途就換thread了,這樣就出現(xiàn)了data races的情況了。

Lock(指令鎖)是可用的最低級(jí)的同步指令。Lock處于鎖定狀態(tài)時(shí),不被特定的線程擁有。Lock包含兩種狀態(tài)——鎖定和非鎖定,
以及兩個(gè)基本的方法??梢哉J(rèn)為Lock有一個(gè)鎖定池,當(dāng)線程請(qǐng)求鎖定時(shí),將線程至于池中,直到獲得鎖定后出池。
池中的線程處于狀態(tài)圖中的同步阻塞狀態(tài)。
構(gòu)造方法:Lock()
實(shí)例方法:
acquire([timeout]): 使線程進(jìn)入同步阻塞狀態(tài),嘗試獲得鎖定。
release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。

3.4 線程死鎖和遞歸鎖

在線程間共享多個(gè)資源的時(shí)候,如果兩個(gè)線程分別占有一部分資源并且同時(shí)等待對(duì)方的資源,就會(huì)造成死鎖,因?yàn)橄到y(tǒng)判斷這部分資源都正在使用,所有這兩個(gè)線程在無外力作用下將一直等待下去。

RLock(可重入鎖)是一個(gè)可以被同一個(gè)線程請(qǐng)求多次的同步指令。RLock使用了“擁有的線程”和“遞歸等級(jí)”的概念,
處于鎖定狀態(tài)時(shí),RLock被某個(gè)線程擁有。擁有RLock的線程可以再次調(diào)用acquire(),釋放鎖時(shí)需要調(diào)用release()相同次數(shù)。
可以認(rèn)為RLock包含一個(gè)鎖定池和一個(gè)初始值為0的計(jì)數(shù)器,每次成功調(diào)用 acquire()/release(),計(jì)數(shù)器將+1/-1,為0時(shí)鎖處于未鎖定狀態(tài)。
構(gòu)造方法:RLock()
實(shí)例方法:acquire([timeout])/release(): 跟Lock差不多。

View Code

解決辦法:使用遞歸鎖,將 lockA=threading.Lock() lockB=threading.Lock()

改為Rlock
為了支持在同一線程中多次請(qǐng)求同一資源,python提供了“可重入鎖”:threading.RLock。RLock內(nèi)部維護(hù)著一個(gè)Lock和一個(gè)counter變量,counter記錄了acquire的次數(shù),從而使得資源可以被多次acquire。直到一個(gè)線程所有的acquire都被release,其他的線程才能獲得資源。

View Code

3.5 單線程和多線程執(zhí)行對(duì)比

View Code

3.6 條件變量同步(Condition)

      有一類線程需要滿足條件之后才能夠繼續(xù)執(zhí)行,Python提供了threading.Condition 對(duì)象用于條件變量線程的支持,它除了能提供RLock()或Lock()的方法外,還提供了 wait()、notify()、notifyAll()方法。

 lock_con=threading.Condition([Lock/Rlock]): 鎖是可選選項(xiàng),不傳人鎖,對(duì)象自動(dòng)創(chuàng)建一個(gè)RLock()。Condition(條件變量)通常與一個(gè)鎖關(guān)聯(lián)。需要在多個(gè)Contidion中共享一個(gè)鎖時(shí),可以傳遞一個(gè)Lock/RLock實(shí)例給構(gòu)造方法,否則它將自己生成一個(gè)RLock實(shí)例??梢哉J(rèn)為,除了Lock帶有的鎖定池外,Condition還包含一個(gè)等待池,池中的線程處于狀態(tài)圖中的等待阻塞狀態(tài),直到另一個(gè)線程調(diào)用notify()/notifyAll()通知;得到通知后線程進(jìn)入鎖定池等待鎖定。

構(gòu)造方法: Condition([lock/rlock])
實(shí)例方法:
acquire([timeout])/release(): 調(diào)用關(guān)聯(lián)的鎖的相應(yīng)方法。
wait([timeout]): 調(diào)用這個(gè)方法將使線程進(jìn)入Condition的等待池等待通知,并釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
notify(): 調(diào)用這個(gè)方法將從等待池挑選一個(gè)線程并通知,收到通知的線程將自動(dòng)調(diào)用acquire()嘗試獲得鎖定(進(jìn)入鎖定池);
其他線程仍然在等待池中。調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
notifyAll(): 調(diào)用這個(gè)方法將通知等待池中所有的線程,這些線程都將進(jìn)入鎖定池嘗試獲得鎖定。
調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
wait():條件不滿足時(shí)調(diào)用,線程會(huì)釋放鎖并進(jìn)入等待阻塞;
notify():條件創(chuàng)造后調(diào)用,通知等待池激活一個(gè)線程;
notifyAll():條件創(chuàng)造后調(diào)用,通知等待池激活所有線程。

View Code

3.7 事件event

python線程的事件用于主線程控制其他線程的執(zhí)行,事件主要提供了三個(gè)方法 set、wait、clear。

事件處理的機(jī)制:全局定義了一個(gè)“Flag”,如果“Flag”值為 False,那么當(dāng)程序執(zhí)行 event.wait 方法時(shí)就會(huì)阻塞,如果“Flag”值為True,那么event.wait 方法時(shí)便不再阻塞。

  • clear:將“Flag”設(shè)置為False
  • set:將“Flag”設(shè)置為True

event.isSet():返回event的狀態(tài)值;

event.wait():如果 event.isSet()==False將阻塞線程;

event.set(): 設(shè)置event的狀態(tài)值為True,所有阻塞池的線程激活進(jìn)入就緒狀態(tài), 等待操作系統(tǒng)調(diào)度;

event.clear():恢復(fù)event的狀態(tài)值為False。

復(fù)制代碼
#__author: greg
#date: 2017/9/18 19:37
import threading
import time

event = threading.Event()

def func():
    # 等待事件,進(jìn)入等待阻塞狀態(tài)
    print('%s wait for event...' % threading.currentThread().getName())
    event.wait()
    # 收到事件后進(jìn)入運(yùn)行狀態(tài)
    print('%s recv event.' % threading.currentThread().getName())

t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t1.start()
t2.start()
time.sleep(2)
print('MainThread set event.')# 發(fā)送事件通知
event.set()
復(fù)制代碼

threading基于Java的線程模型設(shè)計(jì)。鎖(Lock)和條件變量(Condition)在Java中是對(duì)象的基本行為(每一個(gè)對(duì)象都自帶了鎖和條件變量),
而在Python中則是獨(dú)立的對(duì)象。Python Thread提供了Java Thread的行為的子集;沒有優(yōu)先級(jí)、線程組,線程也不能被停止、暫停、恢復(fù)、中斷。
Java Thread中的部分被Python實(shí)現(xiàn)了的靜態(tài)方法在threading中以模塊方法的形式提供。
threading 模塊提供的常用方法:
threading.currentThread(): 返回當(dāng)前的線程變量。
threading.enumerate(): 返回一個(gè)包含正在運(yùn)行的線程的list。正在運(yùn)行指線程啟動(dòng)后、結(jié)束前,不包括啟動(dòng)前和終止后的線程。
threading.activeCount(): 返回正在運(yùn)行的線程數(shù)量,與len(threading.enumerate())有相同的結(jié)果。

Event(事件)是最簡單的線程通信機(jī)制之一:一個(gè)線程通知事件,其他線程等待事件。
Event內(nèi)置了一個(gè)初始為False的標(biāo)志,當(dāng)調(diào)用set()時(shí)設(shè)為True,調(diào)用clear()時(shí)重置為 False
wait()將阻塞線程至等待阻塞狀態(tài)。Event其實(shí)就是一個(gè)簡化版的 Condition
Event沒有鎖,無法使線程進(jìn)入同步阻塞狀態(tài)。
構(gòu)造方法:
Event()實(shí)例方法
isSet(): 當(dāng)內(nèi)置標(biāo)志為True時(shí)返回True。
set(): 將標(biāo)志設(shè)為True,并通知所有處于等待阻塞狀態(tài)的線程恢復(fù)運(yùn)行狀態(tài)。
clear(): 將標(biāo)志設(shè)為False。
wait([timeout]): 如果標(biāo)志為True將立即返回,否則阻塞線程至等待阻塞狀態(tài),等待其他線程調(diào)用set()。
3.8信號(hào)量(Semaphore)

Semaphore/BoundedSemaphore

Semaphore(信號(hào)量)是計(jì)算機(jī)科學(xué)史上最古老的同步指令之一。Semaphore管理一個(gè)內(nèi)置的計(jì)數(shù)器,

每當(dāng)調(diào)用acquire()時(shí)-1,調(diào)用release() 時(shí)+1。計(jì)數(shù)器不能小于0;當(dāng)計(jì)數(shù)器為0時(shí),
acquire()將阻塞線程至同步鎖定狀態(tài),直到其他線程調(diào)用release()。
基于這個(gè)特點(diǎn),Semaphore經(jīng)常用來同步一些有“訪客上限”的對(duì)象,比如連接池。
BoundedSemaphore 與Semaphore的唯一區(qū)別在于前者將在調(diào)用release()時(shí)檢查計(jì)數(shù)器的值是否超過了計(jì)數(shù)器的初始值,
如果超過了將拋出一個(gè)異常。
構(gòu)造方法: Semaphore(value=1): value是計(jì)數(shù)器的初始值。
實(shí)例方法:
acquire([timeout]): 請(qǐng)求Semaphore。如果計(jì)數(shù)器為0,將阻塞線程至同步阻塞狀態(tài);否則將計(jì)數(shù)器-1并立即返回。
release(): 釋放Semaphore,將計(jì)數(shù)器+1,如果使用BoundedSemaphore,還將進(jìn)行釋放次數(shù)檢查。
release()方法不檢查線程是否已獲得 Semaphore。


復(fù)制代碼
#__author: greg
#date: 2017/9/18 10:02
import threading,time
class myThread(threading.Thread):
    def run(self):
        if semaphore.acquire():
            print(self.name)
            time.sleep(3)
            semaphore.release()

if __name__=="__main__":
    semaphore=threading.Semaphore(10)
    thrs=[]
    for i in range(100):
        thrs.append(myThread())
    for t in thrs:
        t.start()
復(fù)制代碼
實(shí)例2

 

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多