1.IO編程 IO(input/output)。凡是用到數(shù)據(jù)交換的地方,都會(huì)涉及io編程,例如磁盤,網(wǎng)絡(luò)的數(shù)據(jù)傳輸。在IO編程中,stream(流)是一種重要的概念,分為輸入流(input stream)和輸出流(output stream)??梢园蚜骷竟?jié)為一個(gè)水管,數(shù)據(jù)相當(dāng)于水管中的水,但是只能單向流動(dòng),所以數(shù)據(jù)傳輸過程中需要假設(shè)兩個(gè)水管,一個(gè)負(fù)責(zé)輸入,一個(gè)負(fù)責(zé)輸出,這樣讀寫就可以實(shí)現(xiàn)同步。 2.GIL:global interpreter lock全局解釋器鎖 在Python中,GIL導(dǎo)致同一時(shí)刻只有一個(gè)線程進(jìn)入解釋器。 計(jì)算機(jī)有些操作不依賴于CPU,I/O操作幾乎不利用,IO密集型不占用CPU,多線程可完成 計(jì)算操作都要用到CPU,適合多進(jìn)程
線程和進(jìn)程的區(qū)別:
進(jìn)程優(yōu)點(diǎn):同時(shí)利用多個(gè)CPU,同時(shí)進(jìn)行多個(gè)操作;缺點(diǎn):耗費(fèi)資源(需要開辟多個(gè)內(nèi)存空間) 線程優(yōu)點(diǎn):共享多個(gè)資源,IO操作,創(chuàng)造并發(fā)操作;缺點(diǎn):搶占資源 3.threading模塊 threading模塊對(duì)象 Thread 表示一個(gè)線程的執(zhí)行的對(duì)象 Lock 鎖原語對(duì)象(跟thread模塊里的鎖對(duì)象相同) RLock 可重入鎖對(duì)象。使單線程可以再次獲得已經(jīng)獲得了的鎖(遞歸鎖定) Condition條件變量對(duì)象能讓一個(gè)線程停下來,等待其他線程滿足了某個(gè)“條件”。如狀態(tài)的改變或值的改變 Event 通用的條件變量。多個(gè)線程可以等待某個(gè)時(shí)間的發(fā)生,在事件發(fā)生后,所有的線程都被激活 Semaphore 為等待鎖的線程提供一個(gè)類似“等候室”的結(jié)構(gòu) BoundedSemaphore 與Semaphore類似,只是它不允許超過初始值 Timer 與thread類似,只是它要等待一段時(shí)間后才開始運(yùn)行 Barrier 創(chuàng)建一個(gè)障礙,必須達(dá)到指定數(shù)量的線程后才可以繼續(xù) 3.1線程的兩種調(diào)用方式 按 Ctrl+C 復(fù)制代碼 按 Ctrl+C 復(fù)制代碼 #繼承式調(diào)用 import threading import time class MyThread(threading.Thread): def __init__(self, num): threading.Thread.__init__(self) self.num = num def run(self): #定義每個(gè)線程要運(yùn)行的函數(shù) print("running on number:%s" % self.num) time.sleep(4) if __name__ == '__main__': t1 = MyThread(111) t2 = MyThread(222) t1.start() t2.start() threading的Thread類主要的運(yùn)行對(duì)象 函數(shù) start() 開始線程的執(zhí)行 run() 定義線程的功能的函數(shù)(一般會(huì)被子類重寫) join(timeout=None) 程序掛起,直到線程結(jié)束;如果給了timeout,則最多阻塞timeout秒 getName() 返回線程的名字 setName(name) 設(shè)置線程的名字 isAlive() 布爾標(biāo)志,表示這個(gè)線程是否還在運(yùn)行中 isDaemon() 返回線程的daemon標(biāo)志 setDaemon(daemonic) 把線程的daemon標(biāo)志設(shè)為daemonic(一定要在調(diào)用start()函數(shù)前調(diào)用) 3.2 join and setDaemon 按 Ctrl+C 復(fù)制代碼 按 Ctrl+C 復(fù)制代碼 join()會(huì)等到線程結(jié)束,或者在給了timeout參數(shù)的時(shí)候,等到超時(shí)為止。使用join()比使用一個(gè)等待鎖釋放的無限循環(huán)清楚一些(也稱“自旋鎖”)。 join()的另一個(gè)比較重要的方法是它可以完全不用調(diào)用。一旦線程啟動(dòng)后,就會(huì)一直運(yùn)行,直到線程的函數(shù)結(jié)束,退出為止。 setDaemon(True):將線程聲明為守護(hù)線程,必須在start() 方法調(diào)用之前設(shè)置, 如果不設(shè)置為守護(hù)線程程序會(huì)被無限掛起。這個(gè)方法基本和join是相反的。當(dāng)我們 在程序運(yùn)行中,執(zhí)行一個(gè)主線程,如果主線程又創(chuàng)建一個(gè)子線程,主線程和子線程 就分兵兩路,分別運(yùn)行,那么當(dāng)主線程完成想退出時(shí),會(huì)檢驗(yàn)子線程是否完成。如 果子線程未完成,則主線程會(huì)等待子線程完成后再退出。但是有時(shí)候我們需要的是 只要主線程完成了,不管子線程是否完成,都要和主線程一起退出,這時(shí)就可以 用setDaemon方法 3.3 同步鎖Lock 由于線程之間是進(jìn)行隨機(jī)調(diào)度,并且每個(gè)線程可能只執(zhí)行n條執(zhí)行之后,當(dāng)多個(gè)線程同時(shí)修改同一條數(shù)據(jù)時(shí)可能會(huì)出現(xiàn)臟數(shù)據(jù),所以,出現(xiàn)了線程鎖 - 同一時(shí)刻允許一個(gè)線程執(zhí)行操作。 import time import threading def addNum(): global num #在每個(gè)線程中都獲取這個(gè)全局變量 num-=1 temp=num print('--get num:',num ) print('看下結(jié)果是0') print('再看下結(jié)果') time.sleep(0.001) num =temp-1 #對(duì)此公共變量進(jìn)行-1操作 num = 100 #設(shè)定一個(gè)共享變量 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有線程執(zhí)行完畢 t.join() print('final num:', num ) 注意: 1: why num-=1沒問題呢?這是因?yàn)閯?dòng)作太快(完成這個(gè)動(dòng)作在切換的時(shí)間內(nèi)) 2: if sleep(1),現(xiàn)象會(huì)更明顯,100個(gè)線程每一個(gè)一定都沒有執(zhí)行完就進(jìn)行了切換,我們說過sleep就等效于IO阻塞,1s之內(nèi)不會(huì)再切換回來,所以最后的結(jié)果一定是99. 多個(gè)線程都在同時(shí)操作同一個(gè)共享資源,所以造成了資源破壞,怎么辦呢? 有同學(xué)會(huì)想用join唄,但join會(huì)把整個(gè)線程給停住,造成了串行,失去了多線程的意義,而我們只需要把計(jì)算(涉及到操作公共數(shù)據(jù))的時(shí)候串行執(zhí)行。 我們可以通過同步鎖來解決這種問題 import time import threading begin=time.time() def addNum(): global num #在每個(gè)線程中都獲取這個(gè)全局變量 # num-=1 lock.acquire() temp=num # print('--get num:',num ) time.sleep(0.0001) num =temp-1 #對(duì)此公共變量進(jìn)行-1操作 lock.release() num = 100 #設(shè)定一個(gè)共享變量 thread_list = [] lock=threading.Lock() for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有線程執(zhí)行完畢 t.join() end=time.time() print('final num:', num ) print(end-begin) 同步鎖與GIL的關(guān)系? Python的線程在GIL的控制之下,線程之間,對(duì)整個(gè)python解釋器,對(duì)python提供的C API的訪問都是互斥的,這可以看作是Python內(nèi)核級(jí)的互斥機(jī)制。但是這種互斥是我們不能控制的,我們還需要另外一種可控的互斥機(jī)制———用戶級(jí)互斥。內(nèi)核級(jí)通過互斥保護(hù)了內(nèi)核的共享資源,同樣,用戶級(jí)互斥保護(hù)了用戶程序中的共享資源。 GIL 的作用是:對(duì)于一個(gè)解釋器,只能有一個(gè)thread在執(zhí)行bytecode。所以每時(shí)每刻只有一條bytecode在被執(zhí)行一個(gè)thread。GIL保證了bytecode 這層面上是thread safe的。
但是如果你有個(gè)操作比如 x += 1,這個(gè)操作需要多個(gè)bytecodes操作,在執(zhí)行這個(gè)操作的多條bytecodes期間的時(shí)候可能中途就換thread了,這樣就出現(xiàn)了data races的情況了。 Lock(指令鎖)是可用的最低級(jí)的同步指令。Lock處于鎖定狀態(tài)時(shí),不被特定的線程擁有。Lock包含兩種狀態(tài)——鎖定和非鎖定, 3.4 線程死鎖和遞歸鎖 在線程間共享多個(gè)資源的時(shí)候,如果兩個(gè)線程分別占有一部分資源并且同時(shí)等待對(duì)方的資源,就會(huì)造成死鎖,因?yàn)橄到y(tǒng)判斷這部分資源都正在使用,所有這兩個(gè)線程在無外力作用下將一直等待下去。 RLock(可重入鎖)是一個(gè)可以被同一個(gè)線程請(qǐng)求多次的同步指令。RLock使用了“擁有的線程”和“遞歸等級(jí)”的概念, ![]() 解決辦法:使用遞歸鎖,將 lockA=threading.Lock() lockB=threading.Lock() 改為Rlock ![]() 3.5 單線程和多線程執(zhí)行對(duì)比 ![]() 3.6 條件變量同步(Condition) 有一類線程需要滿足條件之后才能夠繼續(xù)執(zhí)行,Python提供了threading.Condition 對(duì)象用于條件變量線程的支持,它除了能提供RLock()或Lock()的方法外,還提供了 wait()、notify()、notifyAll()方法。 lock_con=threading.Condition([Lock/Rlock]): 鎖是可選選項(xiàng),不傳人鎖,對(duì)象自動(dòng)創(chuàng)建一個(gè)RLock()。Condition(條件變量)通常與一個(gè)鎖關(guān)聯(lián)。需要在多個(gè)Contidion中共享一個(gè)鎖時(shí),可以傳遞一個(gè)Lock/RLock實(shí)例給構(gòu)造方法,否則它將自己生成一個(gè)RLock實(shí)例??梢哉J(rèn)為,除了Lock帶有的鎖定池外,Condition還包含一個(gè)等待池,池中的線程處于狀態(tài)圖中的等待阻塞狀態(tài),直到另一個(gè)線程調(diào)用notify()/notifyAll()通知;得到通知后線程進(jìn)入鎖定池等待鎖定。 構(gòu)造方法: Condition([lock/rlock]) ![]() 3.7 事件event python線程的事件用于主線程控制其他線程的執(zhí)行,事件主要提供了三個(gè)方法 set、wait、clear。 事件處理的機(jī)制:全局定義了一個(gè)“Flag”,如果“Flag”值為 False,那么當(dāng)程序執(zhí)行 event.wait 方法時(shí)就會(huì)阻塞,如果“Flag”值為True,那么event.wait 方法時(shí)便不再阻塞。
event.isSet():返回event的狀態(tài)值; event.wait():如果 event.isSet()==False將阻塞線程; event.set(): 設(shè)置event的狀態(tài)值為True,所有阻塞池的線程激活進(jìn)入就緒狀態(tài), 等待操作系統(tǒng)調(diào)度; event.clear():恢復(fù)event的狀態(tài)值為False。 #__author: greg #date: 2017/9/18 19:37 import threading import time event = threading.Event() def func(): # 等待事件,進(jìn)入等待阻塞狀態(tài) print('%s wait for event...' % threading.currentThread().getName()) event.wait() # 收到事件后進(jìn)入運(yùn)行狀態(tài) print('%s recv event.' % threading.currentThread().getName()) t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t1.start() t2.start() time.sleep(2) print('MainThread set event.')# 發(fā)送事件通知 event.set() threading基于Java的線程模型設(shè)計(jì)。鎖(Lock)和條件變量(Condition)在Java中是對(duì)象的基本行為(每一個(gè)對(duì)象都自帶了鎖和條件變量), Semaphore/BoundedSemaphore Semaphore(信號(hào)量)是計(jì)算機(jī)科學(xué)史上最古老的同步指令之一。Semaphore管理一個(gè)內(nèi)置的計(jì)數(shù)器, 每當(dāng)調(diào)用acquire()時(shí)-1,調(diào)用release() 時(shí)+1。計(jì)數(shù)器不能小于0;當(dāng)計(jì)數(shù)器為0時(shí), #__author: greg #date: 2017/9/18 10:02 import threading,time class myThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) time.sleep(3) semaphore.release() if __name__=="__main__": semaphore=threading.Semaphore(10) thrs=[] for i in range(100): thrs.append(myThread()) for t in thrs: t.start() ![]()
|
|