使用Python中的線程模塊,能夠同時(shí)運(yùn)行程序的不同部分,并簡化設(shè)計(jì)。如果你已經(jīng)入門Python,并且想用線程來提升程序運(yùn)行速度的話,那本文就是為你準(zhǔn)備的! 通過閱讀本文,你將了解到: · 什么是線程? · 如何創(chuàng)建、執(zhí)行線程? · 如何使用線程池ThreadPoolExecutor? · 如何避免資源競爭問題? · 如何使用Python中線程模塊threading提供的常用工具? 目錄 1. 什么是線程 2. 創(chuàng)建線程 2.1. 守護(hù)線程 2.2. 加入線程 3. 多線程 4. 線程池 5. 競態(tài)條件 5.1. 單線程 5.2. 兩個(gè)線程 5.3. 示例的意義 6. 同步鎖 7. 死鎖 8. 生產(chǎn)者-消費(fèi)者模型中的線程 8.1 在生產(chǎn)者-消費(fèi)者模型中使用鎖 8.2 在生產(chǎn)者-消費(fèi)者模型中使用隊(duì)列 9. 線程對象 9.1 信號量 9.2 定時(shí)器 9.3 柵欄 閱讀提醒: 已掌握Python基本知識; 使用Python 3.6以上版本運(yùn)行。 1. 什么是線程 線程是操作系統(tǒng)能夠進(jìn)行運(yùn)算調(diào)度的最小單位,它被包含在進(jìn)程之中,是進(jìn)程中的實(shí)際運(yùn)作單位。一條線程指的是進(jìn)程中一個(gè)單一順序的控制流,一個(gè)進(jìn)程中可以并發(fā)多個(gè)線程,每條線程并行執(zhí)行不同的任務(wù)。 在Python3中實(shí)現(xiàn)的大部分運(yùn)行任務(wù)里,不同的線程實(shí)際上并沒有同時(shí)運(yùn)行:它們只是看起來像是同時(shí)運(yùn)行的。 大家很容易認(rèn)為線程化是在程序上運(yùn)行兩個(gè)(或多個(gè))不同的處理器,每個(gè)處理器同時(shí)執(zhí)行一個(gè)獨(dú)立的任務(wù)。這種理解并不完全正確,線程可能會在不同的處理器上運(yùn)行,但一次只能運(yùn)行一個(gè)線程。 同時(shí)執(zhí)行多個(gè)任務(wù)需要使用非標(biāo)準(zhǔn)的Python運(yùn)行方式:用不同的語言編寫一部分代碼,或者使用多進(jìn)程模塊multiprocessing,但這么做會帶來一些額外的開銷。 由于Python默認(rèn)的運(yùn)行環(huán)境是CPython(C語言開發(fā)的Python),所以線程化可能不會提升所有任務(wù)的運(yùn)行速度。這是因?yàn)楹虶IL(Global Interpreter Lock)的交互形成了限制:一次只能運(yùn)行一個(gè)Python線程。 線程化的一般替代方法是:讓各項(xiàng)任務(wù)花費(fèi)大量時(shí)間等待外部事件。但問題是,如果想縮短等待時(shí)間,會需要大量的CPU計(jì)算,結(jié)果是程序的運(yùn)行速度可能并不會提升。 當(dāng)代碼是用Python語言編寫并在默認(rèn)執(zhí)行環(huán)境CPython上運(yùn)行時(shí),會出現(xiàn)這種情況。如果線程代碼是用C語言寫的,那它們就能夠釋放GIL并同時(shí)運(yùn)行。如果是在別的Python執(zhí)行環(huán)境(如IPython, PyPy,Jython, IronPython)上運(yùn)行,請參考相關(guān)文檔了解它們是如何處理線程的。 如果只用Python語言在默認(rèn)的Python執(zhí)行環(huán)境下運(yùn)行,并且遇到CPU受限的問題,那就應(yīng)該用多進(jìn)程模塊multiprocessing來解決。 在程序中使用線程也可以簡化設(shè)計(jì)。本文中的大部分示例并不保證可以提升程序運(yùn)行速度,其目的是使設(shè)計(jì)結(jié)構(gòu)更加清晰、便于邏輯推理。 下面就來看看如何使用線程吧! 2. 創(chuàng)建線程 既然已經(jīng)對什么是線程有了初步了解,下面讓我們來學(xué)習(xí)如何創(chuàng)建一個(gè)線程。 Python標(biāo)準(zhǔn)庫提供了threading模塊,里面包含將在本文中介紹的大部分基本模塊。在這個(gè)模塊中,Thread類很好地封裝了有關(guān)線程的子類,為我們提供了干凈的接口來使用它們。 要啟動一個(gè)線程,需要?jiǎng)?chuàng)建一個(gè)Thread實(shí)例,然后調(diào)用.start()方法: import logging 查看日志語句,可以看到__main__部分正在創(chuàng)建并啟動線程:
創(chuàng)建線程時(shí),我們需要傳遞兩個(gè)參數(shù),第一個(gè)參數(shù)target是函數(shù)名,指定這個(gè)線程去哪個(gè)函數(shù)里面去執(zhí)行代碼,第二個(gè)參數(shù)args是一個(gè)元組類型,指定為這個(gè)函數(shù)傳遞的參數(shù)。在本例中,Thread運(yùn)行函數(shù)thread_function(),并將1作為參數(shù)傳遞給該函數(shù)。 在本文中,我們用連續(xù)整數(shù)為線程命名。雖然threading.get_ident()方法能夠?yàn)槊恳粋€(gè)線程生成唯一的名稱,但這些名稱通常會比較長,而且可讀性差。 這里的thread_function()函數(shù)本身沒做什么,它只是簡單地記錄了一些信息,并用time.sleep()隔開。 運(yùn)行程序(注釋掉倒數(shù)第二行代碼),結(jié)果如下: $ ./single_thread.py 可以看到,線程Thread在__main__部分代碼運(yùn)行完后才結(jié)束。下一節(jié)會對這一現(xiàn)象做出解釋,并討論被注釋掉那行代碼。 2.1. 守護(hù)線程 在計(jì)算機(jī)科學(xué)中,守護(hù)進(jìn)程daemon是一類在后臺運(yùn)行的特殊進(jìn)程,用于執(zhí)行特定的系統(tǒng)任務(wù)。 守護(hù)進(jìn)程daemon在Python線程模塊threading中有著特殊的含義。當(dāng)程序退出時(shí),守護(hù)線程將立即關(guān)閉??梢赃@么理解,守護(hù)線程是一個(gè)在后臺運(yùn)行,且不用費(fèi)心去關(guān)閉它的線程,因?yàn)樗鼤S程序自動關(guān)閉。 如果程序運(yùn)行的線程是非守護(hù)線程,那么程序?qū)⒌却芯€程結(jié)束后再終止。但如果運(yùn)行的是守護(hù)線程,當(dāng)程序退出時(shí),守護(hù)線程會被自動殺死。 我們仔細(xì)研究一下上面程序運(yùn)行的結(jié)果,注意看最后兩行。當(dāng)運(yùn)行程序時(shí),在__main__部分打印完all done信息后、線程結(jié)束前,有一個(gè)大約2秒的停頓。 這時(shí),Python在等待非守護(hù)線程完成運(yùn)行。當(dāng)Python程序結(jié)束時(shí),關(guān)閉過程的一部分是清理線程。 查看Python線程模塊的源代碼,可以看到thread ._shutdown()方法遍歷所有正在運(yùn)行的線程,并在每個(gè)非守護(hù)線程上調(diào)用.join()函數(shù),檢查它們是否已經(jīng)結(jié)束運(yùn)行。 因此,程序退出時(shí)需要等待,因?yàn)槭刈o(hù)線程本身會在休眠中等待其他非守護(hù)線程運(yùn)行結(jié)束。一旦thread ._shutdown()運(yùn)行完畢并打印出信息,程序就可以退出。 守護(hù)線程這種自動退出的特性很實(shí)用,但其實(shí)還有其他的方法能實(shí)現(xiàn)相同的功能。我們先用守護(hù)線程重復(fù)運(yùn)行一下上面的程序,看看結(jié)果。只需在創(chuàng)建線程時(shí),添加參數(shù)daemon=True。
現(xiàn)在運(yùn)行程序,結(jié)果如下: $ ./single_thread.py 添加參數(shù)daemon=True前
添加參數(shù)daemon=True后 不同的地方是,之前輸出的最后一行不見了,說明thread_function()函數(shù)沒有機(jī)會完成運(yùn)行。這是一個(gè)守護(hù)線程,所以當(dāng)__main__部分運(yùn)行完最后一行代碼,程序終止,守護(hù)線程被殺死。 2.2. 加入一個(gè)線程 守護(hù)線程用起來很方便,但如果想讓守護(hù)線程運(yùn)行完畢后再結(jié)束程序該怎么辦?或者想讓守護(hù)線程運(yùn)行完后不退出程序呢? 讓我們來看一下剛剛注釋掉的那行代碼: # x.join() 要讓一個(gè)線程等待另一個(gè)線程完成,可以調(diào)用.join()函數(shù)。如果取消對這行代碼的注釋,主線程將會暫停,等待線程x完成運(yùn)行。 這個(gè)功能在守護(hù)線程和非守護(hù)線程上同樣適用。如果用.join()函數(shù)加入了一個(gè)線程,則主線程將一直等待,直到被加入的線程運(yùn)行完成。 3. 多線程 到目前為止,示例代碼中只用到了兩個(gè)線程:主線程和一個(gè)threading.Thread線程對象。 通常,我們希望同時(shí)啟動多個(gè)線程,讓它們執(zhí)行不同的任務(wù)。先來看看比較復(fù)雜的創(chuàng)建多線程的方法,然后再看簡單的。 這個(gè)復(fù)雜的創(chuàng)建方法其實(shí)前面已經(jīng)展示過了:
這段代碼和前面提到的創(chuàng)建單線程時(shí)的結(jié)構(gòu)是一樣的,創(chuàng)建線程對象,然后調(diào)用.start()方法。程序中會保存一個(gè)包含多個(gè)線程對象的列表,為稍后使用.join()函數(shù)做準(zhǔn)備。 多次運(yùn)行這段代碼可能會產(chǎn)生一些有趣的結(jié)果: Main : create and start thread 0. 仔細(xì)看一下輸出結(jié)果,三個(gè)線程都按照預(yù)想的順序創(chuàng)建0,1,2,但它們的結(jié)束順序卻是相反的!多次運(yùn)行將會生成不同的順序。查看線程Thread x: finish中的信息,可以知道每個(gè)線程都在何時(shí)完成。 線程的運(yùn)行順序是由操作系統(tǒng)決定的,并且很難預(yù)測。很有可能每次運(yùn)行所得到的順序都不一樣,所以在用線程設(shè)計(jì)算法時(shí)需要注意這一點(diǎn)。 幸運(yùn)的是,Python中提供了幾個(gè)基礎(chǔ)模塊,可以用來協(xié)調(diào)線程并讓它們一起運(yùn)行。在介紹這部分內(nèi)容之前,讓我們先看看如何更簡單地創(chuàng)建一組線程。 4. 線程池 我們可以用一種更簡單的方法來創(chuàng)建一組線程:線程池ThreadPoolExecutor,它是Python中concurrent.futures標(biāo)準(zhǔn)庫的一部分。(Python 3.2 以上版本適用)。 最簡單的方式是把它創(chuàng)建成上下文管理器,并使用with語句管理線程池的創(chuàng)建和銷毀。 用ThreadPoolExecutor重寫上例中的__main__部分,代碼如下:
這段代碼創(chuàng)建一個(gè)線程池ThreadPoolExecutor作為上下文管理器,并傳入需要的工作線程數(shù)量。然后使用.map()遍歷可迭代對象,本例中是range(3),每個(gè)對象生成池中的一個(gè)線程。 在with模塊的結(jié)尾,會讓線程池ThreadPoolExecutor對池中的每個(gè)線程調(diào)用.join()。強(qiáng)烈建議使用線程池ThreadPoolExecutor作為上下文管理器,因?yàn)檫@樣就不會忘記寫.join()。 注: 使用線程池ThreadPoolExecutor可能會報(bào)一些奇怪的錯(cuò)誤。例如,調(diào)用一個(gè)沒有參數(shù)的函數(shù),但將參數(shù)傳入.map()時(shí),線程將拋出異常。 不幸的是,線程池ThreadPoolExecutor會隱藏該異常,程序會在沒有任何輸出的情況下終止。剛開始調(diào)試時(shí),這會讓人很頭疼。 運(yùn)行修改后的示例代碼,結(jié)果如下: $ ./executor.py 再提醒一下,這里的線程1在線程0之前完成,這是因?yàn)榫€程的調(diào)度是由操作系統(tǒng)決定的,并不遵循一個(gè)特定的順序。 5. 競態(tài)條件 在繼續(xù)介紹Python線程模塊的一些其他特性之前,讓我們先討論一下在編寫線程化程序時(shí)會遇到的一個(gè)更頭疼的問題: 競態(tài)條件。 我們先了解一下競態(tài)條件的含義,然后看一個(gè)實(shí)例,再繼續(xù)學(xué)習(xí)標(biāo)準(zhǔn)庫提供的其他模塊,來防止競態(tài)條件的發(fā)生。 當(dāng)兩個(gè)或多個(gè)線程訪問共享的數(shù)據(jù)或資源時(shí),可能會出現(xiàn)競態(tài)條件。在本例中,我們創(chuàng)建了一個(gè)每次都會發(fā)生的大型競態(tài)條件,但請注意,大多數(shù)競態(tài)條件不會如此頻繁發(fā)生。通常情況下,它們很少發(fā)生,但一旦發(fā)生,會很難進(jìn)行調(diào)試。 在本例中,我們會寫一個(gè)更新數(shù)據(jù)庫的類,但這里并不需要一個(gè)真正的數(shù)據(jù)庫,只是一個(gè)虛擬的,因?yàn)檫@不是本文討論的重點(diǎn)。 這個(gè)FakeDatabase類包括.__init__()和.update()方法。
FakeDatabase類會一直跟蹤一個(gè)值: .value,它是共享數(shù)據(jù),這里會出現(xiàn)競態(tài)條件。 .__init__()方法將.value的值初始化為0。.update()方法從數(shù)據(jù)庫中讀取一個(gè)值,對其進(jìn)行一些計(jì)算,然后將新值寫回?cái)?shù)據(jù)庫。 FakeDatabase類的使用實(shí)例如下: if __name__ == '__main__': 該程序創(chuàng)建一個(gè)線程池ThreadPoolExecutor,里面包含兩個(gè)線程,然后在每個(gè)線程上調(diào)用.submit()方法,告訴它們運(yùn)行database.update()函數(shù)。 .submit()允許將位置參數(shù)和關(guān)鍵字參數(shù)傳遞給正在線程中運(yùn)行的函數(shù):
示例代碼中,index作為唯一一個(gè)位置參數(shù)傳遞給database.update()函數(shù),后面會介紹,也可以用類似的方式傳遞多個(gè)參數(shù)。 由于每個(gè)線程都會運(yùn)行.update(), 讓.value的變量值加上1,所以最后打印出的database.value值應(yīng)該是2。但如果是這樣的話,舉這個(gè)例子就沒有什么意義了。 實(shí)際上,運(yùn)行上面這段代碼的輸出如下: $ ./racecond.py 我們來仔細(xì)研究一下這里究竟發(fā)生了什么,有助于更好地理解有關(guān)這個(gè)問題的解決方案。 5.1. 單線程 在深入研究上面有關(guān)兩個(gè)線程的問題之前,我們先回過頭看一下線程到底是如何工作的。 這里不會討論所有的細(xì)節(jié),因?yàn)樵谀壳斑@個(gè)學(xué)習(xí)階段還沒必要掌握這么多內(nèi)容。我們還將簡化一些東西,雖然可能在技術(shù)上不夠精確,但可以方便大家理解其中的原理。 當(dāng)線程池ThreadPoolExecutor運(yùn)行每個(gè)線程時(shí),我們會指定運(yùn)行哪個(gè)函數(shù),以及傳遞給該函數(shù)的參數(shù):executor.submit(database.update, index),這里是指運(yùn)行database.update函數(shù),并傳入index參數(shù)。 這么做的結(jié)果是,線程池中的每個(gè)線程都將調(diào)用database.update(index)。注意,主線程__main__中創(chuàng)建的database是對FakeDatabase對象的引用。在這個(gè)對象上調(diào)用.update(),會調(diào)用該對象的實(shí)例方法。 每個(gè)線程都將引用同一個(gè)FakeDatabase對象:database。每個(gè)線程還有一個(gè)獨(dú)特的index值,使得日志語句更易閱讀: 當(dāng)線程開始運(yùn)行.update()函數(shù)時(shí),它擁有局部變量local_copy。這絕對是一件好事,否則,運(yùn)行相同函數(shù)的兩個(gè)線程總是會相互混淆。也就是說,函數(shù)內(nèi)定義的局部變量是線程安全的。 現(xiàn)在我們可以看一下,如果使用單線程、調(diào)用一次.update()函數(shù)運(yùn)行上面的程序會發(fā)生什么。 下圖展示了在只運(yùn)行一個(gè)線程的情況下,.update()函數(shù)是如何逐步執(zhí)行的。代碼顯示在左上角,后面跟著一張圖,顯示線程中局部變量local_value和共享數(shù)據(jù)database.value的值: 這張圖是這樣布局的,從上至下時(shí)間增加,它以創(chuàng)建線程1開始,并在線程1終止時(shí)結(jié)束。 線程1啟動時(shí),FakeDatabase.value的值為0。第一行代碼將值0復(fù)制給局部變量local_copy。接下來,local_copy += 1語句讓local_copy的值增加1,可以看到線程1中的.value值變成了1。 然后調(diào)用time.sleep()方法,暫停當(dāng)前線程,并允許其他線程運(yùn)行。因?yàn)楸纠兄挥幸粋€(gè)線程,這里沒什么影響。 當(dāng)線程1被喚醒繼續(xù)運(yùn)行時(shí),它將新值從局部變量local_copy復(fù)制到FakeDatabase.value,線程完成運(yùn)行。可以看到database.value的值被設(shè)為1。 到目前為止,一切順利。我們運(yùn)行了一次.update()函數(shù),FakeDatabase.value值增加到1。 5.2. 兩個(gè)線程 回到競態(tài)條件,這兩個(gè)線程會并發(fā)運(yùn)行,但不會同時(shí)運(yùn)行。它們都有各自的局部變量local_copy,并指向相同的database對象。正是database這個(gè)共享數(shù)據(jù)導(dǎo)致了這些問題。 程序創(chuàng)建線程1,運(yùn)行update()函數(shù): 當(dāng)線程1調(diào)用time.sleep()方法時(shí),它允許另一個(gè)線程開始運(yùn)行。這時(shí),線程2啟動并執(zhí)行相同的操作。它也將database.value的值復(fù)制給私有變量local_copy,但共享數(shù)據(jù)database.value的值還未更新,仍為0: 當(dāng)線程2進(jìn)入休眠狀態(tài)時(shí),共享數(shù)據(jù)database.value的值還是未被修改的0,而且兩個(gè)線程中的私有變量local_copy的值都是1。 現(xiàn)在線程1被喚醒并保存其私有變量local_copy的值,然后終止,線程2繼續(xù)運(yùn)行。線程2在休眠的時(shí)候并不知道線程1已經(jīng)運(yùn)行完畢并更新了database.value中的值,當(dāng)繼續(xù)運(yùn)行時(shí), 它將自己私有變量local_copy的值存儲到database.value中,也是1。 這兩個(gè)線程交錯(cuò)訪問同一個(gè)共享對象,覆蓋了彼此的結(jié)果。當(dāng)一個(gè)線程釋放內(nèi)存或在另一個(gè)線程完成訪問之前關(guān)閉文件句柄時(shí),可能會出現(xiàn)類似的競爭條件。 5.3. 示例的意義 上面的例子是為了確保每次運(yùn)行程序時(shí)都發(fā)生競態(tài)條件。因?yàn)椴僮飨到y(tǒng)可以在任何時(shí)候交換出一個(gè)線程,所以有可能在讀取了x的值之后,像x = x + 1這樣的語句會中斷,導(dǎo)致寫回?cái)?shù)據(jù)庫的值不是我們想要的。 這一過程中的細(xì)節(jié)非常有趣,但本文剩下部分的學(xué)習(xí)不需要了解具體細(xì)節(jié),所以可以先跳過。 看完有關(guān)競態(tài)條件的實(shí)例,讓我們接下來看看如何解決它們! 6. 同步鎖 有很多方法可以避免或解決競態(tài)條件,這里不會介紹所有的解決方法,但會提到一些會經(jīng)常用到的。讓我們先從鎖Lock開始學(xué)習(xí)。 要解決上述競態(tài)條件問題,需要找到一種方法,每次只允許一個(gè)線程進(jìn)入代碼的read-modify-write部分。最常用就是Python中的鎖。在一些其他語言中,同樣的思想被稱為互斥鎖mutex?;コ怄imutex屬于進(jìn)程互斥MUTual EXclusion的一部分,它和鎖所做的工作是一樣的。 鎖是一種類似于通行證的東西,每次只有一個(gè)線程可以擁有鎖,任何其他想要獲得鎖的線程必須等待,直到該鎖的所有者將它釋放出來。 完成此任務(wù)的基本函數(shù)是.acquire()和.release()。線程將調(diào)用my_lock.acquire()來獲取鎖。如果鎖已經(jīng)存在,則調(diào)用線程將會等待,直到鎖被釋放。這里有一點(diǎn)很重要,如果一個(gè)線程獲得了鎖,但從未釋放,程序會被卡住。稍后會介紹更多關(guān)于這方面的內(nèi)容。 幸運(yùn)的是,Python的鎖也將作為上下文管理器運(yùn)行,所以可以在with語句中使用它,并且當(dāng)with模塊出于任何原因退出時(shí),鎖會自動釋放。 讓我們看看添加了鎖的FakeDatabase,調(diào)用函數(shù)保持不變:
除了添加一些調(diào)試日志以便更清楚地查看鎖的運(yùn)行之外,這里最大的變化是添加了一個(gè)叫._lock的成員,它是一個(gè)thread . lock()對象。這個(gè)._lock在未鎖定狀態(tài)下被初始化,并由with語句鎖定和釋放。 值得注意的是,運(yùn)行該函數(shù)的線程將一直持有這個(gè)鎖,直到它完全更新完數(shù)據(jù)庫。在本例中,這意味著它將在復(fù)制、更新、休眠并將值寫回?cái)?shù)據(jù)庫的整個(gè)過程中持有鎖。 日志設(shè)置為警告級別,運(yùn)行程序,結(jié)果如下: $ ./fixrace.py 在主線程__main__中配置完日志輸出后,將日志級別設(shè)置為DEBUG可以打開完整的日志:
用調(diào)試日志運(yùn)行程序的結(jié)果如下: $ ./fixrace.py 線程0獲得鎖,并且在它進(jìn)入睡眠狀態(tài)時(shí)仍然持有鎖。然后線程1啟動并嘗試獲取同一個(gè)鎖,因?yàn)榫€程0仍然持有它,線程1就必須等待。這就是互斥鎖。 本文其余部分的許多示例都有警告和調(diào)試級別的日志記錄。我們通常只顯示警告級別的輸出,因?yàn)檎{(diào)試日志可能非常長。 7. 死鎖 在繼續(xù)學(xué)習(xí)之前,我們先看一下使用鎖時(shí)會出現(xiàn)的常見問題。在上例中,如果鎖已經(jīng)被某個(gè)線程獲取,那么第二次調(diào)用.acquire()時(shí)將一直等待,直到持有鎖的線程調(diào)用.release()將鎖釋放。 思考一下,運(yùn)行下面這段代碼會得到什么結(jié)果:
當(dāng)程序第二次調(diào)用l.acquire()時(shí),它需要等待鎖被釋放。在本例中,可以刪除第二次調(diào)用修復(fù)死鎖,但是死鎖通常在以下兩種情況下會發(fā)生: ① 鎖沒有被正確釋放時(shí)會產(chǎn)生運(yùn)行錯(cuò)誤; ② 在一個(gè)實(shí)用程序函數(shù)需要被其他函數(shù)調(diào)用的地方會出現(xiàn)設(shè)計(jì)問題,這些函數(shù)可能已經(jīng)擁有或者沒有鎖。 第一種情況有時(shí)會發(fā)生,但是使用鎖作為上下文管理器可以大大減少這種情況發(fā)生的頻率。建議充分利用上下文管理器來編寫代碼,因?yàn)樗鼈冇兄诒苊獬霈F(xiàn)異常跳過.release()調(diào)用的情況。 在某些語言中,設(shè)計(jì)問題可能有點(diǎn)棘手。慶幸的是,Python的線程模塊還提供了另一個(gè)鎖對象RLock。它允許線程在調(diào)用.release()之前多次獲取.acquire()鎖,且程序不會阻塞。該線程仍需要保證.release()和.acquire()的調(diào)用次數(shù)相同,但它是用了另一種方式而已。 Lock和RLock是線程化編程中用來防止競爭條件的兩個(gè)基本工具,還有一些其他的工具。在研究它們之前,我們先轉(zhuǎn)移到一個(gè)稍微不同的領(lǐng)域。 8. 生產(chǎn)者-消費(fèi)者模型中的線程 生產(chǎn)者-消費(fèi)者模型是一個(gè)標(biāo)準(zhǔn)的計(jì)算機(jī)科學(xué)領(lǐng)域的問題,用于解決線程同步或進(jìn)程同步。我們先介紹一個(gè)它的變形,大致了解一下Python中的線程模塊提供了哪些基礎(chǔ)模塊。 本例中,假設(shè)需要寫一個(gè)從網(wǎng)絡(luò)讀取消息并將其寫入磁盤的程序。該程序不會主動請求消息,它必須在消息傳入時(shí)偵聽并接受它們。而且這些消息不會以固定的速度傳入,而是以突發(fā)的方式傳入。這一部分程序叫做生產(chǎn)者。 另一方面,一旦傳入了消息,就需要將其寫入數(shù)據(jù)庫。數(shù)據(jù)庫訪問很慢,但訪問速度足以跟上消息傳入的平均速度。但當(dāng)大量消息同時(shí)傳入時(shí),速度會跟不上。這部分程序叫消費(fèi)者。 在生產(chǎn)者和消費(fèi)者之間,需要?jiǎng)?chuàng)建一個(gè)管道Pipeline,隨著對不同同步對象的深入了解,我們需要對管道里面的內(nèi)容進(jìn)行修改。 這就是基本的框架。讓我們看看使用Lock的解決方案。雖然它并不是最佳的解決方法,但它運(yùn)用的是前面已經(jīng)介紹過的工具,所以比較容易理解。 8.1. 在生產(chǎn)者-消費(fèi)者模型中使用鎖 既然這是一篇關(guān)于Python線程的文章,而且剛剛已經(jīng)閱讀了有關(guān)鎖的內(nèi)容,所以讓我們嘗試用鎖解決競態(tài)條件問題。 先寫一個(gè)生產(chǎn)者線程,從虛擬網(wǎng)絡(luò)中讀取消息并放入管道中: SENTINEL = object() 生產(chǎn)者獲得一個(gè)介于1到100之間的隨機(jī)數(shù),作為生成的虛擬消息。它調(diào)用管道上的.set_message()方法將其發(fā)送給消費(fèi)者。 生產(chǎn)者還用一個(gè)SENTINEL值來警告消費(fèi)者,在它發(fā)送10個(gè)值之后停止。這有點(diǎn)奇怪,但不必?fù)?dān)心,在完成本示例后,會介紹如何去掉這個(gè)SENTINEL值。 管道pipeline的另一端是消費(fèi)者:
消費(fèi)者從管道中讀取一條消息并將其寫入虛擬數(shù)據(jù)庫,在本例中,只是將其儲存到磁盤中。如果消費(fèi)者獲取了SENTINEL值,線程會終止。 在研究管道Pipeline之前,先看一下生成這些線程的主線程__main__部分: if __name__ == '__main__': 看起來應(yīng)該很熟悉,因?yàn)樗颓懊媸纠薪榻B過的__main__部分類似。 注意,打開調(diào)試日志可以查看所有的日志消息,方法是取消對這一行的注釋:
我們有必要遍歷調(diào)試日志消息,來查看每個(gè)線程是在何處獲得和釋放鎖的。 現(xiàn)在讓我們看一下將消息從生產(chǎn)者傳遞給消費(fèi)者的管道Pipeline: class Pipeline: 好長一段代碼!別害怕,大部分是日志語句,刪除所有日志語句后的代碼如下:
這樣看起來更清晰,管道類中有三個(gè)成員: ① .message存儲要傳遞的消息; ② .producer_lock是一個(gè)線程鎖對象,限制生產(chǎn)者線程對消息的訪問; ③.consumer_lock也是一個(gè)線程鎖,限制消費(fèi)者線程對消息的訪問。 __init__() 初始化這三個(gè)成員,然后在.consumer_lock上調(diào)用.acquire(),消費(fèi)者獲得鎖。生產(chǎn)者可以添加新消息,但消費(fèi)者需要等待消息出現(xiàn)。 get_message()和.set_messages()幾乎是相反的操作。.get_message()在consumer_lock上調(diào)用.acquire(),這么做的目的是讓消費(fèi)者等待,直到有消息傳入。 一旦消費(fèi)者獲得了鎖.consumer_lock,它會將self.message的值復(fù)制給.message,然后在.producer_lock上調(diào)用.release()。釋放此鎖允許生產(chǎn)者在管道中插入下一條消息。 .get_message()函數(shù)中有一些細(xì)節(jié)很容易被忽略。大家思考一下,為什么不把message變量刪掉,直接返回self.message的值呢? 答案如下。 只要消費(fèi)者調(diào)用.producer_lock.release(),它就被交換出去,生產(chǎn)者開始運(yùn)行,這可能發(fā)生在鎖被完全釋放之前!也就是說,存在一種微小的可能性,當(dāng)函數(shù)返回self.message時(shí),這個(gè)值是生產(chǎn)者生成的下一條消息,導(dǎo)致第一條消息丟失。這是競態(tài)條件的另一個(gè)例子。 我們繼續(xù)看事務(wù)的另一端:.set_message()。生產(chǎn)者通過傳入一條消息來調(diào)用該函數(shù),獲得鎖.producer_lock,傳入.message值,然后調(diào)用consumer_lock.release()釋放鎖,這將允許消費(fèi)者讀取該值。 運(yùn)行代碼,日志設(shè)置為警告級別,結(jié)果如下: $ ./prodcom_lock.py 大家可能會覺得奇怪,生產(chǎn)者在消費(fèi)者還沒運(yùn)行之前就獲得了兩條消息?;剡^頭仔細(xì)看一下生產(chǎn)者和.set_message()函數(shù),生產(chǎn)者先獲取消息,打印出日志語句,然后試圖將消息放入管道中,這時(shí)才需要等待鎖。 當(dāng)生產(chǎn)者試圖傳入第二條消息時(shí),它會第二次調(diào)用.set_message(),發(fā)生阻塞。 操作系統(tǒng)可以在任何時(shí)候交換線程,但它通常會允許每個(gè)線程在交換之前有一段合理的運(yùn)行時(shí)間。這就是為什么生產(chǎn)者會一直運(yùn)行,直到第二次調(diào)用.set_message()時(shí)被阻塞。 一旦線程被阻塞,操作系統(tǒng)總是會把它交換出去,并找到另一個(gè)線程去運(yùn)行。在本例中,就是消費(fèi)者線程。 消費(fèi)者調(diào)用.get_message()函數(shù),它讀取消息并在.producer_lock上調(diào)用.release()方法,釋放鎖,允許生產(chǎn)者再次運(yùn)行。 注意,第一個(gè)值是43,正是消費(fèi)者所讀取的值,雖然生產(chǎn)者已經(jīng)生成了新值45。 盡管使用鎖的這種方法適用于本例,但對于常見的生產(chǎn)者-消費(fèi)者模式問題,這不是一個(gè)很好的解決方法,因?yàn)樗淮沃辉试S管道中有一個(gè)值。當(dāng)生產(chǎn)者收到大量值時(shí),將無處安放。 讓我們繼續(xù)看一個(gè)更好的解決方法:使用隊(duì)列Queue. 8.2. 在生產(chǎn)者-消費(fèi)者模型中使用隊(duì)列 如果想在管道中一次處理多個(gè)值,我們需要為管道提供一個(gè)數(shù)據(jù)結(jié)構(gòu),當(dāng)從生產(chǎn)者線程備份數(shù)據(jù)時(shí),該結(jié)構(gòu)允許管道中的數(shù)據(jù)量靈活變動,不再是單一值。 Python標(biāo)準(zhǔn)庫中有一個(gè)模塊叫隊(duì)列queue,里面有一個(gè)類叫Queue。讓我們用隊(duì)列Queue改寫一下上面受鎖保護(hù)的管道。 此外,我們還會介紹另一種停止工作線程的方法,使用Python線程模塊中的事件Event對象。 事件的觸發(fā)機(jī)制可以是多種多樣的。在本例中,主線程只是休眠一段時(shí)間,然后調(diào)用event.set()方法,通知所有處于等待阻塞狀態(tài)的線程恢復(fù)運(yùn)行狀態(tài):
這里惟一的變化是在第8行創(chuàng)建了事件對象event,在第10行和第11行傳遞了event參數(shù),代碼的最后一個(gè)部分13-15行,先休眠0.1秒,記錄一條消息,然后在事件上調(diào)用.set()方法。 生產(chǎn)者也不用變太多: def producer(pipeline, event): 在第3行循環(huán)部分設(shè)置了事件,而且也不用再把SENTINEL值放入管道中。 消費(fèi)者的變化稍多:
除了需要?jiǎng)h掉和SENTINEL值相關(guān)的代碼,還要執(zhí)行稍微復(fù)雜一點(diǎn)的循環(huán)條件。它會一直循環(huán),直到事件結(jié)束,管道中的數(shù)據(jù)被清空。 一定要確保當(dāng)消費(fèi)者退出時(shí),隊(duì)列是空的。如果消費(fèi)者在管道包含消息時(shí)退出,可能會出現(xiàn)兩個(gè)問題。一是會丟失那部分?jǐn)?shù)據(jù),但更嚴(yán)重的是生產(chǎn)者會被鎖住。 在生產(chǎn)者檢查.is_set()條件后、但在調(diào)用pipeline.set_message()前觸發(fā)事件,則會發(fā)生這種情況。 一旦發(fā)生這種情況,生產(chǎn)者可能被喚醒并退出,但此時(shí)鎖仍被消費(fèi)者持有。然后,生產(chǎn)者將嘗試用.acquire()方法獲取鎖,但是消費(fèi)者已經(jīng)退出,而且永遠(yuǎn)不會釋放鎖,所以生產(chǎn)者就會一直等下去。 消費(fèi)者的其余部分看起來應(yīng)該很熟悉。 管道類的寫法變化最大: class Pipeline(queue.Queue): Pipeline是queue.Queue的一個(gè)子類。Queue隊(duì)列里面有一個(gè)可選參數(shù),在初始化時(shí)指定隊(duì)列所能容納的最大數(shù)據(jù)量。 .get_message()和.set_message()變得更簡短,被隊(duì)列中的.get()和.put()方法替代。 大家可能想知道,防止競爭條件的代碼都跑哪里去了? 編寫標(biāo)準(zhǔn)庫的核心開發(fā)人員知道,在多線程環(huán)境中經(jīng)常使用隊(duì)列Queue,因此將所有鎖定代碼合并到了隊(duì)列Queue模塊內(nèi)部。隊(duì)列Queue本身就是線程安全的。 程序運(yùn)行結(jié)果如下:
生產(chǎn)者創(chuàng)建了5條消息,并將其中4條放到隊(duì)列中。但在放置第5條消息之前,它被操作系統(tǒng)交換出去了。 然后消費(fèi)者開始運(yùn)行并儲存第1條消息,打印出該消息和隊(duì)列大?。?/p> 這就是為什么第5條消息沒有成功進(jìn)入管道。刪除一條消息后,隊(duì)列的大小縮減到3個(gè)。因?yàn)殛?duì)列最多可以容納10條消息,所以生產(chǎn)者線程沒有被隊(duì)列阻塞,而是被操作系統(tǒng)交換出去了。 注意:每次運(yùn)行所得到的結(jié)果會不同。這就是使用線程的樂趣所在! 當(dāng)程序開始結(jié)束時(shí),主線程觸發(fā)事件,生產(chǎn)者立即退出。但消費(fèi)者仍有很多工作要做,所以它會繼續(xù)運(yùn)行,直到清理完管道中的數(shù)據(jù)為止。 嘗試修改生產(chǎn)者或消費(fèi)者中的隊(duì)列大小和time.sleep()中的休眠時(shí)間,來分別模擬更長的網(wǎng)絡(luò)或磁盤訪問時(shí)間。即使是輕微的更改,也會對結(jié)果產(chǎn)生很大的影響。 對于生產(chǎn)者-消費(fèi)者模型,這是一個(gè)更好的解決方法,但其實(shí)可以進(jìn)一步簡化。去掉管道Pipeline和日志語句,就只剩下和queue.Queue相關(guān)的語句了。 直接使用queue.Queue的最終代碼如下:
可以看到,使用Python的內(nèi)置基礎(chǔ)模塊能夠簡化復(fù)雜的問題,讓代碼閱讀起來更清晰。 鎖Lock和隊(duì)列Queue是解決并發(fā)問題非常方便的兩個(gè)類,但其實(shí)標(biāo)準(zhǔn)庫還提供了其他類。在結(jié)束本教程之前,讓我們快速瀏覽一下還有哪些類。 9. 線程對象 Python的線程threading模塊還有其他一些基本類型。雖然在上面的例子中沒有用到,但它們會在不同的情況下派上用場,所以熟悉一下還是很好處的。 9.1 信號量 首先要介紹的是信號量thread.semaphore,信號量是具有一些特殊屬性的計(jì)數(shù)器。 第一個(gè)屬性是計(jì)數(shù)的原子性,可以保證操作系統(tǒng)不會在計(jì)數(shù)器遞增或遞減的過程中交換線程。 內(nèi)部計(jì)數(shù)器在調(diào)用.release()時(shí)遞增,在調(diào)用.acquire()時(shí)遞減。 另一個(gè)特殊屬性是,如果線程在計(jì)數(shù)器為0時(shí)調(diào)用.acquire(),那么該線程將阻塞,直到另一個(gè)線程調(diào)用.release()并將計(jì)數(shù)器的值增加到1。 信號量通常用于保護(hù)容量有限的資源。例如,我們有一個(gè)連接池,并且希望限制該連接池中的元素?cái)?shù)量,就可以用信號量來進(jìn)行管理。 9.2 定時(shí)器 threading.Timer是一個(gè)定時(shí)器功能的類,指定函數(shù)在間隔特定時(shí)間后執(zhí)行任務(wù)。我們可以通過傳入需要等待的時(shí)間和函數(shù)來創(chuàng)建一個(gè)定時(shí)器: t = threading.Timer(30.0, my_function) 調(diào)用.start()啟動定時(shí)器,函數(shù)將在指定時(shí)間過后的某個(gè)時(shí)間點(diǎn)上被新線程調(diào)用。但請注意,這里并不能保證函數(shù)會在我們所期望的確切時(shí)間被調(diào)用,可能會存在誤差。 如果想要停止已經(jīng)啟動的定時(shí)器,可以調(diào)用.cancel()。在定時(shí)器觸發(fā)后調(diào)用.cancel()不會執(zhí)行任何操作,也不會產(chǎn)生異常。 定時(shí)器可用于在特定時(shí)間之后提示用戶執(zhí)行操作。如果用戶在定時(shí)器過時(shí)之前執(zhí)行了操作,可以調(diào)用.cancel()取消定時(shí)。 9.3 柵欄 threading模塊中的柵欄Barrier可以用來指定需要同步運(yùn)行的線程數(shù)量。創(chuàng)建柵欄Barrier時(shí),我們必須指定所需同步的線程數(shù)。每個(gè)線程都會在Barrier上調(diào)用.wait()方法,它們會先保持阻塞狀態(tài),直到等待的線程數(shù)量達(dá)到指定值時(shí),會被同時(shí)釋放。 注意,線程是由操作系統(tǒng)調(diào)度的,因此,即使所有線程同時(shí)被釋放,一次也只能運(yùn)行一個(gè)線程。 柵欄可以用來初始化一個(gè)線程池。讓線程初始化后在柵欄里等待,可以確保程序在所有線程都完成初始化后再開始運(yùn)行。 米哥點(diǎn)評 感謝Little monster同學(xué)的翻譯和整理。本篇對線程及多線程開發(fā)進(jìn)行了很好的詮釋,從基礎(chǔ)介紹到線程實(shí)例,從入門到進(jìn)階,全方位多角度講解有關(guān)線程開發(fā)方方面面的內(nèi)容,即便是工作了多年的老程序員,看完之后也是收獲頗多。多線程在數(shù)據(jù)采集和處理方面都有不少的應(yīng)用場景,相信對很多Tushare用戶會有所助益,寄以此篇促大家學(xué)好用好,提升數(shù)據(jù)效率。 |
|