日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

你真的懂線程嗎?史上最全Python線程解析

 立志德美 2019-04-29

使用Python中的線程模塊,能夠同時(shí)運(yùn)行程序的不同部分,并簡化設(shè)計(jì)。如果你已經(jīng)入門Python,并且想用線程來提升程序運(yùn)行速度的話,那本文就是為你準(zhǔn)備的!

通過閱讀本文,你將了解到:

· 什么是線程?

· 如何創(chuàng)建、執(zhí)行線程?

· 如何使用線程池ThreadPoolExecutor?

· 如何避免資源競爭問題?

· 如何使用Python中線程模塊threading提供的常用工具?

目錄

1. 什么是線程

2. 創(chuàng)建線程

    2.1. 守護(hù)線程

    2.2. 加入線程

3. 多線程

4. 線程池

5. 競態(tài)條件

    5.1. 單線程

    5.2. 兩個(gè)線程

    5.3. 示例的意義

6. 同步鎖

7. 死鎖

8. 生產(chǎn)者-消費(fèi)者模型中的線程

    8.1 在生產(chǎn)者-消費(fèi)者模型中使用鎖

    8.2 在生產(chǎn)者-消費(fèi)者模型中使用隊(duì)列

9. 線程對象

    9.1 信號量

    9.2 定時(shí)器

    9.3 柵欄

閱讀提醒:

已掌握Python基本知識;

使用Python 3.6以上版本運(yùn)行。

1. 什么是線程

線程是操作系統(tǒng)能夠進(jìn)行運(yùn)算調(diào)度的最小單位,它被包含在進(jìn)程之中,是進(jìn)程中的實(shí)際運(yùn)作單位。一條線程指的是進(jìn)程中一個(gè)單一順序的控制流,一個(gè)進(jìn)程中可以并發(fā)多個(gè)線程,每條線程并行執(zhí)行不同的任務(wù)。

在Python3中實(shí)現(xiàn)的大部分運(yùn)行任務(wù)里,不同的線程實(shí)際上并沒有同時(shí)運(yùn)行:它們只是看起來像是同時(shí)運(yùn)行的。

大家很容易認(rèn)為線程化是在程序上運(yùn)行兩個(gè)(或多個(gè))不同的處理器,每個(gè)處理器同時(shí)執(zhí)行一個(gè)獨(dú)立的任務(wù)。這種理解并不完全正確,線程可能會在不同的處理器上運(yùn)行,但一次只能運(yùn)行一個(gè)線程。

同時(shí)執(zhí)行多個(gè)任務(wù)需要使用非標(biāo)準(zhǔn)的Python運(yùn)行方式:用不同的語言編寫一部分代碼,或者使用多進(jìn)程模塊multiprocessing,但這么做會帶來一些額外的開銷。

由于Python默認(rèn)的運(yùn)行環(huán)境是CPython(C語言開發(fā)的Python),所以線程化可能不會提升所有任務(wù)的運(yùn)行速度。這是因?yàn)楹虶IL(Global Interpreter Lock)的交互形成了限制:一次只能運(yùn)行一個(gè)Python線程。

線程化的一般替代方法是:讓各項(xiàng)任務(wù)花費(fèi)大量時(shí)間等待外部事件。但問題是,如果想縮短等待時(shí)間,會需要大量的CPU計(jì)算,結(jié)果是程序的運(yùn)行速度可能并不會提升。

當(dāng)代碼是用Python語言編寫并在默認(rèn)執(zhí)行環(huán)境CPython上運(yùn)行時(shí),會出現(xiàn)這種情況。如果線程代碼是用C語言寫的,那它們就能夠釋放GIL并同時(shí)運(yùn)行。如果是在別的Python執(zhí)行環(huán)境(如IPython, PyPy,Jython, IronPython)上運(yùn)行,請參考相關(guān)文檔了解它們是如何處理線程的。

如果只用Python語言在默認(rèn)的Python執(zhí)行環(huán)境下運(yùn)行,并且遇到CPU受限的問題,那就應(yīng)該用多進(jìn)程模塊multiprocessing來解決。

在程序中使用線程也可以簡化設(shè)計(jì)。本文中的大部分示例并不保證可以提升程序運(yùn)行速度,其目的是使設(shè)計(jì)結(jié)構(gòu)更加清晰、便于邏輯推理。

下面就來看看如何使用線程吧!

2. 創(chuàng)建線程

既然已經(jīng)對什么是線程有了初步了解,下面讓我們來學(xué)習(xí)如何創(chuàng)建一個(gè)線程。

Python標(biāo)準(zhǔn)庫提供了threading模塊,里面包含將在本文中介紹的大部分基本模塊。在這個(gè)模塊中,Thread類很好地封裝了有關(guān)線程的子類,為我們提供了干凈的接口來使用它們。

要啟動一個(gè)線程,需要?jiǎng)?chuàng)建一個(gè)Thread實(shí)例,然后調(diào)用.start()方法:

import logging
import threading
import time


def thread_function(name):
    logging.info('Thread %s: starting', name)
    time.sleep(2)
    logging.info('Thread %s: finishing', name)


if __name__ == '__main__':
     format = '%(asctime)s: %(message)s'
     logging.basicConfig(format=format, level=logging.INFO,
                        datefmt='%H:%M:%S')

     logging.info('Main    : before creating thread')
     x = threading.Thread(target=thread_function, args=(1,))
     logging.info('Main    : before running thread')
     x.start()
     logging.info('Main    : wait for the thread to finish')
     # x.join()
     logging.info('Main    : all done')

查看日志語句,可以看到__main__部分正在創(chuàng)建并啟動線程:

x = threading.Thread(target=thread_function, args=(1,))
x.start()

創(chuàng)建線程時(shí),我們需要傳遞兩個(gè)參數(shù),第一個(gè)參數(shù)target是函數(shù)名,指定這個(gè)線程去哪個(gè)函數(shù)里面去執(zhí)行代碼,第二個(gè)參數(shù)args是一個(gè)元組類型,指定為這個(gè)函數(shù)傳遞的參數(shù)。在本例中,Thread運(yùn)行函數(shù)thread_function(),并將1作為參數(shù)傳遞給該函數(shù)。

在本文中,我們用連續(xù)整數(shù)為線程命名。雖然threading.get_ident()方法能夠?yàn)槊恳粋€(gè)線程生成唯一的名稱,但這些名稱通常會比較長,而且可讀性差。

這里的thread_function()函數(shù)本身沒做什么,它只是簡單地記錄了一些信息,并用time.sleep()隔開。

運(yùn)行程序(注釋掉倒數(shù)第二行代碼),結(jié)果如下:

$ ./single_thread.py
Main    : before creating thread
Main    : before running thread
Thread 1: starting
Main    : wait for the thread to finish
Main    : all done
Thread 1: finishing

可以看到,線程Thread__main__部分代碼運(yùn)行完后才結(jié)束。下一節(jié)會對這一現(xiàn)象做出解釋,并討論被注釋掉那行代碼。

2.1. 守護(hù)線程

在計(jì)算機(jī)科學(xué)中,守護(hù)進(jìn)程daemon是一類在后臺運(yùn)行的特殊進(jìn)程,用于執(zhí)行特定的系統(tǒng)任務(wù)。

守護(hù)進(jìn)程daemon在Python線程模塊threading中有著特殊的含義。當(dāng)程序退出時(shí),守護(hù)線程將立即關(guān)閉??梢赃@么理解,守護(hù)線程是一個(gè)在后臺運(yùn)行,且不用費(fèi)心去關(guān)閉它的線程,因?yàn)樗鼤S程序自動關(guān)閉。

如果程序運(yùn)行的線程是非守護(hù)線程,那么程序?qū)⒌却芯€程結(jié)束后再終止。但如果運(yùn)行的是守護(hù)線程,當(dāng)程序退出時(shí),守護(hù)線程會被自動殺死。

我們仔細(xì)研究一下上面程序運(yùn)行的結(jié)果,注意看最后兩行。當(dāng)運(yùn)行程序時(shí),在__main__部分打印完all done信息后、線程結(jié)束前,有一個(gè)大約2秒的停頓。

這時(shí),Python在等待非守護(hù)線程完成運(yùn)行。當(dāng)Python程序結(jié)束時(shí),關(guān)閉過程的一部分是清理線程。

查看Python線程模塊的源代碼,可以看到thread ._shutdown()方法遍歷所有正在運(yùn)行的線程,并在每個(gè)非守護(hù)線程上調(diào)用.join()函數(shù),檢查它們是否已經(jīng)結(jié)束運(yùn)行。

因此,程序退出時(shí)需要等待,因?yàn)槭刈o(hù)線程本身會在休眠中等待其他非守護(hù)線程運(yùn)行結(jié)束。一旦thread ._shutdown()運(yùn)行完畢并打印出信息,程序就可以退出。

守護(hù)線程這種自動退出的特性很實(shí)用,但其實(shí)還有其他的方法能實(shí)現(xiàn)相同的功能。我們先用守護(hù)線程重復(fù)運(yùn)行一下上面的程序,看看結(jié)果。只需在創(chuàng)建線程時(shí),添加參數(shù)daemon=True

x = threading.Thread(target=thread_function, args=(1,), daemon=True)

現(xiàn)在運(yùn)行程序,結(jié)果如下:

$ ./single_thread.py
Main    : before creating thread
Main    : before running thread
Thread 1: starting
Main    : wait for the thread to finish
Main    : all done
Thread 1: finishing

添加參數(shù)daemon=True前

$ ./daemon_thread.py
Main    : before creating thread
Main    : before running thread
Thread 1: starting
Main    : wait for the thread to finish
Main    : all done

添加參數(shù)daemon=True后

不同的地方是,之前輸出的最后一行不見了,說明thread_function()函數(shù)沒有機(jī)會完成運(yùn)行。這是一個(gè)守護(hù)線程,所以當(dāng)__main__部分運(yùn)行完最后一行代碼,程序終止,守護(hù)線程被殺死。

2.2. 加入一個(gè)線程

守護(hù)線程用起來很方便,但如果想讓守護(hù)線程運(yùn)行完畢后再結(jié)束程序該怎么辦?或者想讓守護(hù)線程運(yùn)行完后不退出程序呢?

讓我們來看一下剛剛注釋掉的那行代碼:

# x.join()

要讓一個(gè)線程等待另一個(gè)線程完成,可以調(diào)用.join()函數(shù)。如果取消對這行代碼的注釋,主線程將會暫停,等待線程x完成運(yùn)行。

這個(gè)功能在守護(hù)線程和非守護(hù)線程上同樣適用。如果用.join()函數(shù)加入了一個(gè)線程,則主線程將一直等待,直到被加入的線程運(yùn)行完成。

3. 多線程

到目前為止,示例代碼中只用到了兩個(gè)線程:主線程和一個(gè)threading.Thread線程對象。

通常,我們希望同時(shí)啟動多個(gè)線程,讓它們執(zhí)行不同的任務(wù)。先來看看比較復(fù)雜的創(chuàng)建多線程的方法,然后再看簡單的。

這個(gè)復(fù)雜的創(chuàng)建方法其實(shí)前面已經(jīng)展示過了:

import logging
import threading
import time

def thread_function(name):
    logging.info('Thread %s: starting', name)
    time.sleep(2)
    logging.info('Thread %s: finishing', name)

if __name__ == '__main__':
    format = '%(asctime)s: %(message)s'
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt='%H:%M:%S')

    threads = list()
    for index in range(3):
        logging.info('Main    : create and start thread %d.', index)
        x = threading.Thread(target=thread_function, args=(index,))
        threads.append(x)
        x.start()

    for index, thread in enumerate(threads):
        logging.info('Main    : before joining thread %d.', index)
        thread.join()
        logging.info('Main    : thread %d done', index)

這段代碼和前面提到的創(chuàng)建單線程時(shí)的結(jié)構(gòu)是一樣的,創(chuàng)建線程對象,然后調(diào)用.start()方法。程序中會保存一個(gè)包含多個(gè)線程對象的列表,為稍后使用.join()函數(shù)做準(zhǔn)備。

多次運(yùn)行這段代碼可能會產(chǎn)生一些有趣的結(jié)果:

Main    : create and start thread 0.
Thread 0: starting
Main    : create and start thread 1.
Thread 1: starting
Main    : create and start thread 2.
Thread 2: starting
Main    : before joining thread 0.
Thread 2: finishing
Thread 1: finishing
Thread 0: finishing
Main    : thread 0 done
Main    : before joining thread 1.
Main    : thread 1 done
Main    : before joining thread 2.
Main    : thread 2 done

仔細(xì)看一下輸出結(jié)果,三個(gè)線程都按照預(yù)想的順序創(chuàng)建0,1,2,但它們的結(jié)束順序卻是相反的!多次運(yùn)行將會生成不同的順序。查看線程Thread x: finish中的信息,可以知道每個(gè)線程都在何時(shí)完成。

線程的運(yùn)行順序是由操作系統(tǒng)決定的,并且很難預(yù)測。很有可能每次運(yùn)行所得到的順序都不一樣,所以在用線程設(shè)計(jì)算法時(shí)需要注意這一點(diǎn)。

幸運(yùn)的是,Python中提供了幾個(gè)基礎(chǔ)模塊,可以用來協(xié)調(diào)線程并讓它們一起運(yùn)行。在介紹這部分內(nèi)容之前,讓我們先看看如何更簡單地創(chuàng)建一組線程。

4. 線程池

我們可以用一種更簡單的方法來創(chuàng)建一組線程:線程池ThreadPoolExecutor,它是Python中concurrent.futures標(biāo)準(zhǔn)庫的一部分。(Python 3.2 以上版本適用)。

最簡單的方式是把它創(chuàng)建成上下文管理器,并使用with語句管理線程池的創(chuàng)建和銷毀。

ThreadPoolExecutor重寫上例中的__main__部分,代碼如下:

import concurrent.futures

# [rest of code]

if __name__ == '__main__':
    format = '%(asctime)s: %(message)s'
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt='%H:%M:%S')

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(thread_function, range(3))

這段代碼創(chuàng)建一個(gè)線程池ThreadPoolExecutor作為上下文管理器,并傳入需要的工作線程數(shù)量。然后使用.map()遍歷可迭代對象,本例中是range(3),每個(gè)對象生成池中的一個(gè)線程。

with模塊的結(jié)尾,會讓線程池ThreadPoolExecutor對池中的每個(gè)線程調(diào)用.join()。強(qiáng)烈建議使用線程池ThreadPoolExecutor作為上下文管理器,因?yàn)檫@樣就不會忘記寫.join()。

注:

使用線程池ThreadPoolExecutor可能會報(bào)一些奇怪的錯(cuò)誤。例如,調(diào)用一個(gè)沒有參數(shù)的函數(shù),但將參數(shù)傳入.map()時(shí),線程將拋出異常。

不幸的是,線程池ThreadPoolExecutor會隱藏該異常,程序會在沒有任何輸出的情況下終止。剛開始調(diào)試時(shí),這會讓人很頭疼。

運(yùn)行修改后的示例代碼,結(jié)果如下:

$ ./executor.py
Thread 0: starting
Thread 1: starting
Thread 2: starting
Thread 1: finishing
Thread 0: finishing
Thread 2: finishing

再提醒一下,這里的線程1在線程0之前完成,這是因?yàn)榫€程的調(diào)度是由操作系統(tǒng)決定的,并不遵循一個(gè)特定的順序。

5. 競態(tài)條件

在繼續(xù)介紹Python線程模塊的一些其他特性之前,讓我們先討論一下在編寫線程化程序時(shí)會遇到的一個(gè)更頭疼的問題: 競態(tài)條件。

我們先了解一下競態(tài)條件的含義,然后看一個(gè)實(shí)例,再繼續(xù)學(xué)習(xí)標(biāo)準(zhǔn)庫提供的其他模塊,來防止競態(tài)條件的發(fā)生。

當(dāng)兩個(gè)或多個(gè)線程訪問共享的數(shù)據(jù)或資源時(shí),可能會出現(xiàn)競態(tài)條件。在本例中,我們創(chuàng)建了一個(gè)每次都會發(fā)生的大型競態(tài)條件,但請注意,大多數(shù)競態(tài)條件不會如此頻繁發(fā)生。通常情況下,它們很少發(fā)生,但一旦發(fā)生,會很難進(jìn)行調(diào)試。

在本例中,我們會寫一個(gè)更新數(shù)據(jù)庫的類,但這里并不需要一個(gè)真正的數(shù)據(jù)庫,只是一個(gè)虛擬的,因?yàn)檫@不是本文討論的重點(diǎn)。

這個(gè)FakeDatabase類包括.__init__().update()方法。

class FakeDatabase:
    def __init__(self):
        self.value = 0

    def update(self, name):
        logging.info('Thread %s: starting update', name)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        logging.info('Thread %s: finishing update', name)

FakeDatabase類會一直跟蹤一個(gè)值: .value,它是共享數(shù)據(jù),這里會出現(xiàn)競態(tài)條件。

.__init__()方法將.value的值初始化為0。.update()方法從數(shù)據(jù)庫中讀取一個(gè)值,對其進(jìn)行一些計(jì)算,然后將新值寫回?cái)?shù)據(jù)庫。

FakeDatabase類的使用實(shí)例如下:

if __name__ == '__main__':
    format = '%(asctime)s: %(message)s'
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt='%H:%M:%S')

    database = FakeDatabase()
    logging.info('Testing update. Starting value is %d.', database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.update, index)
    logging.info('Testing update. Ending value is %d.', database.value)

該程序創(chuàng)建一個(gè)線程池ThreadPoolExecutor,里面包含兩個(gè)線程,然后在每個(gè)線程上調(diào)用.submit()方法,告訴它們運(yùn)行database.update()函數(shù)。

.submit()允許將位置參數(shù)和關(guān)鍵字參數(shù)傳遞給正在線程中運(yùn)行的函數(shù):

.submit(function, *args, **kwargs)

示例代碼中,index作為唯一一個(gè)位置參數(shù)傳遞給database.update()函數(shù),后面會介紹,也可以用類似的方式傳遞多個(gè)參數(shù)。

由于每個(gè)線程都會運(yùn)行.update(), 讓.value的變量值加上1,所以最后打印出的database.value值應(yīng)該是2。但如果是這樣的話,舉這個(gè)例子就沒有什么意義了。

實(shí)際上,運(yùn)行上面這段代碼的輸出如下:

$ ./racecond.py
Testing unlocked update. Starting value is 0.
Thread 0: starting update
Thread 1: starting update
Thread 0: finishing update
Thread 1: finishing update
Testing unlocked update. Ending value is 1.

我們來仔細(xì)研究一下這里究竟發(fā)生了什么,有助于更好地理解有關(guān)這個(gè)問題的解決方案。

5.1. 單線程

在深入研究上面有關(guān)兩個(gè)線程的問題之前,我們先回過頭看一下線程到底是如何工作的。

這里不會討論所有的細(xì)節(jié),因?yàn)樵谀壳斑@個(gè)學(xué)習(xí)階段還沒必要掌握這么多內(nèi)容。我們還將簡化一些東西,雖然可能在技術(shù)上不夠精確,但可以方便大家理解其中的原理。

當(dāng)線程池ThreadPoolExecutor運(yùn)行每個(gè)線程時(shí),我們會指定運(yùn)行哪個(gè)函數(shù),以及傳遞給該函數(shù)的參數(shù):executor.submit(database.update, index),這里是指運(yùn)行database.update函數(shù),并傳入index參數(shù)。

這么做的結(jié)果是,線程池中的每個(gè)線程都將調(diào)用database.update(index)。注意,主線程__main__中創(chuàng)建的database是對FakeDatabase對象的引用。在這個(gè)對象上調(diào)用.update(),會調(diào)用該對象的實(shí)例方法。

每個(gè)線程都將引用同一個(gè)FakeDatabase對象:database。每個(gè)線程還有一個(gè)獨(dú)特的index值,使得日志語句更易閱讀:

當(dāng)線程開始運(yùn)行.update()函數(shù)時(shí),它擁有局部變量local_copy。這絕對是一件好事,否則,運(yùn)行相同函數(shù)的兩個(gè)線程總是會相互混淆。也就是說,函數(shù)內(nèi)定義的局部變量是線程安全的。

現(xiàn)在我們可以看一下,如果使用單線程、調(diào)用一次.update()函數(shù)運(yùn)行上面的程序會發(fā)生什么。

下圖展示了在只運(yùn)行一個(gè)線程的情況下,.update()函數(shù)是如何逐步執(zhí)行的。代碼顯示在左上角,后面跟著一張圖,顯示線程中局部變量local_value和共享數(shù)據(jù)database.value的值:

這張圖是這樣布局的,從上至下時(shí)間增加,它以創(chuàng)建線程1開始,并在線程1終止時(shí)結(jié)束。

線程1啟動時(shí),FakeDatabase.value的值為0。第一行代碼將值0復(fù)制給局部變量local_copy。接下來,local_copy += 1語句讓local_copy的值增加1,可以看到線程1中的.value值變成了1。

然后調(diào)用time.sleep()方法,暫停當(dāng)前線程,并允許其他線程運(yùn)行。因?yàn)楸纠兄挥幸粋€(gè)線程,這里沒什么影響。

當(dāng)線程1被喚醒繼續(xù)運(yùn)行時(shí),它將新值從局部變量local_copy復(fù)制到FakeDatabase.value,線程完成運(yùn)行。可以看到database.value的值被設(shè)為1。

到目前為止,一切順利。我們運(yùn)行了一次.update()函數(shù),FakeDatabase.value值增加到1。

5.2. 兩個(gè)線程

回到競態(tài)條件,這兩個(gè)線程會并發(fā)運(yùn)行,但不會同時(shí)運(yùn)行。它們都有各自的局部變量local_copy,并指向相同的database對象。正是database這個(gè)共享數(shù)據(jù)導(dǎo)致了這些問題。

程序創(chuàng)建線程1,運(yùn)行update()函數(shù):

當(dāng)線程1調(diào)用time.sleep()方法時(shí),它允許另一個(gè)線程開始運(yùn)行。這時(shí),線程2啟動并執(zhí)行相同的操作。它也將database.value的值復(fù)制給私有變量local_copy,但共享數(shù)據(jù)database.value的值還未更新,仍為0:

當(dāng)線程2進(jìn)入休眠狀態(tài)時(shí),共享數(shù)據(jù)database.value的值還是未被修改的0,而且兩個(gè)線程中的私有變量local_copy的值都是1。

現(xiàn)在線程1被喚醒并保存其私有變量local_copy的值,然后終止,線程2繼續(xù)運(yùn)行。線程2在休眠的時(shí)候并不知道線程1已經(jīng)運(yùn)行完畢并更新了database.value中的值,當(dāng)繼續(xù)運(yùn)行時(shí), 它將自己私有變量local_copy的值存儲到database.value中,也是1。

這兩個(gè)線程交錯(cuò)訪問同一個(gè)共享對象,覆蓋了彼此的結(jié)果。當(dāng)一個(gè)線程釋放內(nèi)存或在另一個(gè)線程完成訪問之前關(guān)閉文件句柄時(shí),可能會出現(xiàn)類似的競爭條件。

5.3. 示例的意義

上面的例子是為了確保每次運(yùn)行程序時(shí)都發(fā)生競態(tài)條件。因?yàn)椴僮飨到y(tǒng)可以在任何時(shí)候交換出一個(gè)線程,所以有可能在讀取了x的值之后,像x = x + 1這樣的語句會中斷,導(dǎo)致寫回?cái)?shù)據(jù)庫的值不是我們想要的。

這一過程中的細(xì)節(jié)非常有趣,但本文剩下部分的學(xué)習(xí)不需要了解具體細(xì)節(jié),所以可以先跳過。

看完有關(guān)競態(tài)條件的實(shí)例,讓我們接下來看看如何解決它們!

6. 同步鎖

有很多方法可以避免或解決競態(tài)條件,這里不會介紹所有的解決方法,但會提到一些會經(jīng)常用到的。讓我們先從鎖Lock開始學(xué)習(xí)。

要解決上述競態(tài)條件問題,需要找到一種方法,每次只允許一個(gè)線程進(jìn)入代碼的read-modify-write部分。最常用就是Python中的鎖。在一些其他語言中,同樣的思想被稱為互斥鎖mutex?;コ怄imutex屬于進(jìn)程互斥MUTual EXclusion的一部分,它和鎖所做的工作是一樣的。

鎖是一種類似于通行證的東西,每次只有一個(gè)線程可以擁有鎖,任何其他想要獲得鎖的線程必須等待,直到該鎖的所有者將它釋放出來。

完成此任務(wù)的基本函數(shù)是.acquire().release()。線程將調(diào)用my_lock.acquire()來獲取鎖。如果鎖已經(jīng)存在,則調(diào)用線程將會等待,直到鎖被釋放。這里有一點(diǎn)很重要,如果一個(gè)線程獲得了鎖,但從未釋放,程序會被卡住。稍后會介紹更多關(guān)于這方面的內(nèi)容。

幸運(yùn)的是,Python的鎖也將作為上下文管理器運(yùn)行,所以可以在with語句中使用它,并且當(dāng)with模塊出于任何原因退出時(shí),鎖會自動釋放。

讓我們看看添加了鎖的FakeDatabase,調(diào)用函數(shù)保持不變:

class FakeDatabase:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def locked_update(self, name):
        logging.info('Thread %s: starting update', name)
        logging.debug('Thread %s about to lock', name)
        with self._lock:
            logging.debug('Thread %s has lock', name)
            local_copy = self.value
            local_copy += 1
            time.sleep(0.1)
            self.value = local_copy
            logging.debug('Thread %s about to release lock', name)
        logging.debug('Thread %s after release', name)
        logging.info('Thread %s: finishing update', name)

除了添加一些調(diào)試日志以便更清楚地查看鎖的運(yùn)行之外,這里最大的變化是添加了一個(gè)叫._lock的成員,它是一個(gè)thread . lock()對象。這個(gè)._lock在未鎖定狀態(tài)下被初始化,并由with語句鎖定和釋放。

值得注意的是,運(yùn)行該函數(shù)的線程將一直持有這個(gè)鎖,直到它完全更新完數(shù)據(jù)庫。在本例中,這意味著它將在復(fù)制、更新、休眠并將值寫回?cái)?shù)據(jù)庫的整個(gè)過程中持有鎖。

日志設(shè)置為警告級別,運(yùn)行程序,結(jié)果如下:

$ ./fixrace.py
Testing locked update. Starting value is 0.
Thread 0: starting update
Thread 1: starting update
Thread 0: finishing update
Thread 1: finishing update
Testing locked update. Ending value is 2.

在主線程__main__中配置完日志輸出后,將日志級別設(shè)置為DEBUG可以打開完整的日志:

logging.getLogger().setLevel(logging.DEBUG)

用調(diào)試日志運(yùn)行程序的結(jié)果如下:

$ ./fixrace.py
Testing locked update. Starting value is 0.
Thread 0: starting update
Thread 0 about to lock
Thread 0 has lock
Thread 1: starting update
Thread 1 about to lock
Thread 0 about to release lock
Thread 0 after release
Thread 0: finishing update
Thread 1 has lock
Thread 1 about to release lock
Thread 1 after release
Thread 1: finishing update
Testing locked update. Ending value is 2.

線程0獲得鎖,并且在它進(jìn)入睡眠狀態(tài)時(shí)仍然持有鎖。然后線程1啟動并嘗試獲取同一個(gè)鎖,因?yàn)榫€程0仍然持有它,線程1就必須等待。這就是互斥鎖。

本文其余部分的許多示例都有警告和調(diào)試級別的日志記錄。我們通常只顯示警告級別的輸出,因?yàn)檎{(diào)試日志可能非常長。

7. 死鎖

在繼續(xù)學(xué)習(xí)之前,我們先看一下使用鎖時(shí)會出現(xiàn)的常見問題。在上例中,如果鎖已經(jīng)被某個(gè)線程獲取,那么第二次調(diào)用.acquire()時(shí)將一直等待,直到持有鎖的線程調(diào)用.release()將鎖釋放。

思考一下,運(yùn)行下面這段代碼會得到什么結(jié)果:

import threading

l = threading.Lock()
print('before first acquire')
l.acquire()
print('before second acquire')
l.acquire()
print('acquired lock twice')

當(dāng)程序第二次調(diào)用l.acquire()時(shí),它需要等待鎖被釋放。在本例中,可以刪除第二次調(diào)用修復(fù)死鎖,但是死鎖通常在以下兩種情況下會發(fā)生:

① 鎖沒有被正確釋放時(shí)會產(chǎn)生運(yùn)行錯(cuò)誤;

② 在一個(gè)實(shí)用程序函數(shù)需要被其他函數(shù)調(diào)用的地方會出現(xiàn)設(shè)計(jì)問題,這些函數(shù)可能已經(jīng)擁有或者沒有鎖。

第一種情況有時(shí)會發(fā)生,但是使用鎖作為上下文管理器可以大大減少這種情況發(fā)生的頻率。建議充分利用上下文管理器來編寫代碼,因?yàn)樗鼈冇兄诒苊獬霈F(xiàn)異常跳過.release()調(diào)用的情況。

在某些語言中,設(shè)計(jì)問題可能有點(diǎn)棘手。慶幸的是,Python的線程模塊還提供了另一個(gè)鎖對象RLock。它允許線程在調(diào)用.release()之前多次獲取.acquire()鎖,且程序不會阻塞。該線程仍需要保證.release().acquire()的調(diào)用次數(shù)相同,但它是用了另一種方式而已。

LockRLock是線程化編程中用來防止競爭條件的兩個(gè)基本工具,還有一些其他的工具。在研究它們之前,我們先轉(zhuǎn)移到一個(gè)稍微不同的領(lǐng)域。

8. 生產(chǎn)者-消費(fèi)者模型中的線程

生產(chǎn)者-消費(fèi)者模型是一個(gè)標(biāo)準(zhǔn)的計(jì)算機(jī)科學(xué)領(lǐng)域的問題,用于解決線程同步或進(jìn)程同步。我們先介紹一個(gè)它的變形,大致了解一下Python中的線程模塊提供了哪些基礎(chǔ)模塊。

本例中,假設(shè)需要寫一個(gè)從網(wǎng)絡(luò)讀取消息并將其寫入磁盤的程序。該程序不會主動請求消息,它必須在消息傳入時(shí)偵聽并接受它們。而且這些消息不會以固定的速度傳入,而是以突發(fā)的方式傳入。這一部分程序叫做生產(chǎn)者。

另一方面,一旦傳入了消息,就需要將其寫入數(shù)據(jù)庫。數(shù)據(jù)庫訪問很慢,但訪問速度足以跟上消息傳入的平均速度。但當(dāng)大量消息同時(shí)傳入時(shí),速度會跟不上。這部分程序叫消費(fèi)者。

在生產(chǎn)者和消費(fèi)者之間,需要?jiǎng)?chuàng)建一個(gè)管道Pipeline,隨著對不同同步對象的深入了解,我們需要對管道里面的內(nèi)容進(jìn)行修改。

這就是基本的框架。讓我們看看使用Lock的解決方案。雖然它并不是最佳的解決方法,但它運(yùn)用的是前面已經(jīng)介紹過的工具,所以比較容易理解。

8.1. 在生產(chǎn)者-消費(fèi)者模型中使用鎖

既然這是一篇關(guān)于Python線程的文章,而且剛剛已經(jīng)閱讀了有關(guān)鎖的內(nèi)容,所以讓我們嘗試用鎖解決競態(tài)條件問題。

先寫一個(gè)生產(chǎn)者線程,從虛擬網(wǎng)絡(luò)中讀取消息并放入管道中:

SENTINEL = object()

def producer(pipeline):
    '''Pretend we're getting a message from the network.'''
    for index in range(10):
        message = random.randint(1, 101)
        logging.info('Producer got message: %s', message)
        pipeline.set_message(message, 'Producer')

    # Send a sentinel message to tell consumer we're done
    pipeline.set_message(SENTINEL, 'Producer')

生產(chǎn)者獲得一個(gè)介于1到100之間的隨機(jī)數(shù),作為生成的虛擬消息。它調(diào)用管道上的.set_message()方法將其發(fā)送給消費(fèi)者。

生產(chǎn)者還用一個(gè)SENTINEL值來警告消費(fèi)者,在它發(fā)送10個(gè)值之后停止。這有點(diǎn)奇怪,但不必?fù)?dān)心,在完成本示例后,會介紹如何去掉這個(gè)SENTINEL值。

管道pipeline的另一端是消費(fèi)者:

def consumer(pipeline):
    '''Pretend we're saving a number in the database.'''
    message = 0
    while message is not SENTINEL:
        message = pipeline.get_message('Consumer')
        if message is not SENTINEL:
            logging.info('Consumer storing message: %s', message)

消費(fèi)者從管道中讀取一條消息并將其寫入虛擬數(shù)據(jù)庫,在本例中,只是將其儲存到磁盤中。如果消費(fèi)者獲取了SENTINEL值,線程會終止。

在研究管道Pipeline之前,先看一下生成這些線程的主線程__main__部分:

if __name__ == '__main__':
    format = '%(asctime)s: %(message)s'
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt='%H:%M:%S')
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline)
        executor.submit(consumer, pipeline)

看起來應(yīng)該很熟悉,因?yàn)樗颓懊媸纠薪榻B過的__main__部分類似。

注意,打開調(diào)試日志可以查看所有的日志消息,方法是取消對這一行的注釋:

# logging.getLogger().setLevel(logging.DEBUG)

我們有必要遍歷調(diào)試日志消息,來查看每個(gè)線程是在何處獲得和釋放鎖的。

現(xiàn)在讓我們看一下將消息從生產(chǎn)者傳遞給消費(fèi)者的管道Pipeline:

class Pipeline:
    '''
    Class to allow a single element pipeline between producer and consumer.
    '''
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def get_message(self, name):
        logging.debug('%s:about to acquire getlock', name)
        self.consumer_lock.acquire()
        logging.debug('%s:have getlock', name)
        message = self.message
        logging.debug('%s:about to release setlock', name)
        self.producer_lock.release()
        logging.debug('%s:setlock released', name)
        return message

    def set_message(self, message, name):
        logging.debug('%s:about to acquire setlock', name)
        self.producer_lock.acquire()
        logging.debug('%s:have setlock', name)
        self.message = message
        logging.debug('%s:about to release getlock', name)
        self.consumer_lock.release()
        logging.debug('%s:getlock released', name)

好長一段代碼!別害怕,大部分是日志語句,刪除所有日志語句后的代碼如下:

class Pipeline:
    '''
    Class to allow a single element pipeline between producer and consumer.
    '''
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def get_message(self, name):
        self.consumer_lock.acquire()
        message = self.message
        self.producer_lock.release()
        return message

    def set_message(self, message, name):
        self.producer_lock.acquire()
        self.message = message
        self.consumer_lock.release()

這樣看起來更清晰,管道類中有三個(gè)成員:

.message存儲要傳遞的消息;

.producer_lock是一個(gè)線程鎖對象,限制生產(chǎn)者線程對消息的訪問;

.consumer_lock也是一個(gè)線程鎖,限制消費(fèi)者線程對消息的訪問。

__init__() 初始化這三個(gè)成員,然后在.consumer_lock上調(diào)用.acquire(),消費(fèi)者獲得鎖。生產(chǎn)者可以添加新消息,但消費(fèi)者需要等待消息出現(xiàn)。

get_message().set_messages()幾乎是相反的操作。.get_message()consumer_lock上調(diào)用.acquire(),這么做的目的是讓消費(fèi)者等待,直到有消息傳入。

一旦消費(fèi)者獲得了鎖.consumer_lock,它會將self.message的值復(fù)制給.message,然后在.producer_lock上調(diào)用.release()。釋放此鎖允許生產(chǎn)者在管道中插入下一條消息。

.get_message()函數(shù)中有一些細(xì)節(jié)很容易被忽略。大家思考一下,為什么不把message變量刪掉,直接返回self.message的值呢?

答案如下。

只要消費(fèi)者調(diào)用.producer_lock.release(),它就被交換出去,生產(chǎn)者開始運(yùn)行,這可能發(fā)生在鎖被完全釋放之前!也就是說,存在一種微小的可能性,當(dāng)函數(shù)返回self.message時(shí),這個(gè)值是生產(chǎn)者生成的下一條消息,導(dǎo)致第一條消息丟失。這是競態(tài)條件的另一個(gè)例子。

我們繼續(xù)看事務(wù)的另一端:.set_message()。生產(chǎn)者通過傳入一條消息來調(diào)用該函數(shù),獲得鎖.producer_lock,傳入.message值,然后調(diào)用consumer_lock.release()釋放鎖,這將允許消費(fèi)者讀取該值。

運(yùn)行代碼,日志設(shè)置為警告級別,結(jié)果如下:

$ ./prodcom_lock.py
Producer got data 43
Producer got data 45
Consumer storing data: 43
Producer got data 86
Consumer storing data: 45
Producer got data 40
Consumer storing data: 86
Producer got data 62
Consumer storing data: 40
Producer got data 15
Consumer storing data: 62
Producer got data 16
Consumer storing data: 15
Producer got data 61
Consumer storing data: 16
Producer got data 73
Consumer storing data: 61
Producer got data 22
Consumer storing data: 73
Consumer storing data: 22

大家可能會覺得奇怪,生產(chǎn)者在消費(fèi)者還沒運(yùn)行之前就獲得了兩條消息?;剡^頭仔細(xì)看一下生產(chǎn)者和.set_message()函數(shù),生產(chǎn)者先獲取消息,打印出日志語句,然后試圖將消息放入管道中,這時(shí)才需要等待鎖。

當(dāng)生產(chǎn)者試圖傳入第二條消息時(shí),它會第二次調(diào)用.set_message(),發(fā)生阻塞。

操作系統(tǒng)可以在任何時(shí)候交換線程,但它通常會允許每個(gè)線程在交換之前有一段合理的運(yùn)行時(shí)間。這就是為什么生產(chǎn)者會一直運(yùn)行,直到第二次調(diào)用.set_message()時(shí)被阻塞。

一旦線程被阻塞,操作系統(tǒng)總是會把它交換出去,并找到另一個(gè)線程去運(yùn)行。在本例中,就是消費(fèi)者線程。

消費(fèi)者調(diào)用.get_message()函數(shù),它讀取消息并在.producer_lock上調(diào)用.release()方法,釋放鎖,允許生產(chǎn)者再次運(yùn)行。

注意,第一個(gè)值是43,正是消費(fèi)者所讀取的值,雖然生產(chǎn)者已經(jīng)生成了新值45。

盡管使用鎖的這種方法適用于本例,但對于常見的生產(chǎn)者-消費(fèi)者模式問題,這不是一個(gè)很好的解決方法,因?yàn)樗淮沃辉试S管道中有一個(gè)值。當(dāng)生產(chǎn)者收到大量值時(shí),將無處安放。

讓我們繼續(xù)看一個(gè)更好的解決方法:使用隊(duì)列Queue.

8.2. 在生產(chǎn)者-消費(fèi)者模型中使用隊(duì)列

如果想在管道中一次處理多個(gè)值,我們需要為管道提供一個(gè)數(shù)據(jù)結(jié)構(gòu),當(dāng)從生產(chǎn)者線程備份數(shù)據(jù)時(shí),該結(jié)構(gòu)允許管道中的數(shù)據(jù)量靈活變動,不再是單一值。

Python標(biāo)準(zhǔn)庫中有一個(gè)模塊叫隊(duì)列queue,里面有一個(gè)類叫Queue。讓我們用隊(duì)列Queue改寫一下上面受鎖保護(hù)的管道。

此外,我們還會介紹另一種停止工作線程的方法,使用Python線程模塊中的事件Event對象。

事件的觸發(fā)機(jī)制可以是多種多樣的。在本例中,主線程只是休眠一段時(shí)間,然后調(diào)用event.set()方法,通知所有處于等待阻塞狀態(tài)的線程恢復(fù)運(yùn)行狀態(tài):

1 if __name__ == '__main__':
2     format = '%(asctime)s: %(message)s'
3     logging.basicConfig(format=format, level=logging.INFO,
4                          datefmt='%H:%M:%S')
5     # logging.getLogger().setLevel(logging.DEBUG)
6
7     pipeline = Pipeline()
8     event = threading.Event()
9     with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
10        executor.submit(producer, pipeline, event)
11        executor.submit(consumer, pipeline, event)
12
13        time.sleep(0.1)
14        logging.info('Main: about to set event')
15        event.set()

這里惟一的變化是在第8行創(chuàng)建了事件對象event,在第10行和第11行傳遞了event參數(shù),代碼的最后一個(gè)部分13-15行,先休眠0.1秒,記錄一條消息,然后在事件上調(diào)用.set()方法。

生產(chǎn)者也不用變太多:

def producer(pipeline, event):
    '''Pretend we're getting a number from the network.'''
    while not event.is_set():
    message = random.randint(1, 101)
    logging.info('Producer got message: %s', message)
    pipeline.set_message(message, 'Producer')

    logging.info('Producer received EXIT event. Exiting')

在第3行循環(huán)部分設(shè)置了事件,而且也不用再把SENTINEL值放入管道中。

消費(fèi)者的變化稍多:

def consumer(pipeline, event):
    '''Pretend we're saving a number in the database.'''
    while not event.is_set() or not pipeline.empty():
     message = pipeline.get_message('Consumer')
     logging.info(
     'Consumer storing message: %s  (queue size=%s)',
     message,
     pipeline.qsize(),
        )

    logging.info('Consumer received EXIT event. Exiting')

除了需要?jiǎng)h掉和SENTINEL值相關(guān)的代碼,還要執(zhí)行稍微復(fù)雜一點(diǎn)的循環(huán)條件。它會一直循環(huán),直到事件結(jié)束,管道中的數(shù)據(jù)被清空。

一定要確保當(dāng)消費(fèi)者退出時(shí),隊(duì)列是空的。如果消費(fèi)者在管道包含消息時(shí)退出,可能會出現(xiàn)兩個(gè)問題。一是會丟失那部分?jǐn)?shù)據(jù),但更嚴(yán)重的是生產(chǎn)者會被鎖住。

在生產(chǎn)者檢查.is_set()條件后、但在調(diào)用pipeline.set_message()前觸發(fā)事件,則會發(fā)生這種情況。

一旦發(fā)生這種情況,生產(chǎn)者可能被喚醒并退出,但此時(shí)鎖仍被消費(fèi)者持有。然后,生產(chǎn)者將嘗試用.acquire()方法獲取鎖,但是消費(fèi)者已經(jīng)退出,而且永遠(yuǎn)不會釋放鎖,所以生產(chǎn)者就會一直等下去。

消費(fèi)者的其余部分看起來應(yīng)該很熟悉。

管道類的寫法變化最大:

class Pipeline(queue.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    def get_message(self, name):
        logging.debug('%s:about to get from queue', name)
        value = self.get()
        logging.debug('%s:got %d from queue', name, value)
        return value

    def set_message(self, value, name):
        logging.debug('%s:about to add %d to queue', name, value)
        self.put(value)
        logging.debug('%s:added %d to queue', name, value)

Pipelinequeue.Queue的一個(gè)子類。Queue隊(duì)列里面有一個(gè)可選參數(shù),在初始化時(shí)指定隊(duì)列所能容納的最大數(shù)據(jù)量。

.get_message().set_message()變得更簡短,被隊(duì)列中的.get().put()方法替代。

大家可能想知道,防止競爭條件的代碼都跑哪里去了?

編寫標(biāo)準(zhǔn)庫的核心開發(fā)人員知道,在多線程環(huán)境中經(jīng)常使用隊(duì)列Queue,因此將所有鎖定代碼合并到了隊(duì)列Queue模塊內(nèi)部。隊(duì)列Queue本身就是線程安全的。

程序運(yùn)行結(jié)果如下:

$ ./prodcom_queue.py
Producer got message: 32
Producer got message: 51
Producer got message: 25
Producer got message: 94
Producer got message: 29
Consumer storing message: 32 (queue size=3)
Producer got message: 96
Consumer storing message: 51 (queue size=3)
Producer got message: 6
Consumer storing message: 25 (queue size=3)
Producer got message: 31

[many lines deleted]

Producer got message: 80
Consumer storing message: 94 (queue size=6)
Producer got message: 33
Consumer storing message: 20 (queue size=6)
Producer got message: 48
Consumer storing message: 31 (queue size=6)
Producer got message: 52
Consumer storing message: 98 (queue size=6)
Main: about to set event
Producer got message: 13
Consumer storing message: 59 (queue size=6)
Producer received EXIT event. Exiting
Consumer storing message: 75 (queue size=6)
Consumer storing message: 97 (queue size=5)
Consumer storing message: 80 (queue size=4)
Consumer storing message: 33 (queue size=3)
Consumer storing message: 48 (queue size=2)
Consumer storing message: 52 (queue size=1)
Consumer storing message: 13 (queue size=0)
Consumer received EXIT event. Exiting

生產(chǎn)者創(chuàng)建了5條消息,并將其中4條放到隊(duì)列中。但在放置第5條消息之前,它被操作系統(tǒng)交換出去了。

然后消費(fèi)者開始運(yùn)行并儲存第1條消息,打印出該消息和隊(duì)列大?。?/p>

Consumer storing message: 32 (queue size=3)

這就是為什么第5條消息沒有成功進(jìn)入管道。刪除一條消息后,隊(duì)列的大小縮減到3個(gè)。因?yàn)殛?duì)列最多可以容納10條消息,所以生產(chǎn)者線程沒有被隊(duì)列阻塞,而是被操作系統(tǒng)交換出去了。

注意:每次運(yùn)行所得到的結(jié)果會不同。這就是使用線程的樂趣所在!

當(dāng)程序開始結(jié)束時(shí),主線程觸發(fā)事件,生產(chǎn)者立即退出。但消費(fèi)者仍有很多工作要做,所以它會繼續(xù)運(yùn)行,直到清理完管道中的數(shù)據(jù)為止。

嘗試修改生產(chǎn)者或消費(fèi)者中的隊(duì)列大小和time.sleep()中的休眠時(shí)間,來分別模擬更長的網(wǎng)絡(luò)或磁盤訪問時(shí)間。即使是輕微的更改,也會對結(jié)果產(chǎn)生很大的影響。

對于生產(chǎn)者-消費(fèi)者模型,這是一個(gè)更好的解決方法,但其實(shí)可以進(jìn)一步簡化。去掉管道Pipeline和日志語句,就只剩下和queue.Queue相關(guān)的語句了。

直接使用queue.Queue的最終代碼如下:

import concurrent.futures
import logging
import queue
import random
import threading
import time

def producer(queue, event):
    '''Pretend we're getting a number from the network.'''
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info('Producer got message: %s', message)
        queue.put(message)

    logging.info('Producer received event. Exiting')

def consumer(queue, event):
    '''Pretend we're saving a number in the database.'''
    while not event.is_set() or not queue.empty():
        message = queue.get()
        logging.info(
            'Consumer storing message: %s (size=%d)', message, queue.qsize()
        )

    logging.info('Consumer received event. Exiting')

if __name__ == '__main__':
    format = '%(asctime)s: %(message)s'
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt='%H:%M:%S')

    pipeline = queue.Queue(maxsize=10)
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.1)
        logging.info('Main: about to set event')
        event.set()

可以看到,使用Python的內(nèi)置基礎(chǔ)模塊能夠簡化復(fù)雜的問題,讓代碼閱讀起來更清晰。

Lock和隊(duì)列Queue是解決并發(fā)問題非常方便的兩個(gè)類,但其實(shí)標(biāo)準(zhǔn)庫還提供了其他類。在結(jié)束本教程之前,讓我們快速瀏覽一下還有哪些類。

9. 線程對象

Python的線程threading模塊還有其他一些基本類型。雖然在上面的例子中沒有用到,但它們會在不同的情況下派上用場,所以熟悉一下還是很好處的。

9.1 信號量

首先要介紹的是信號量thread.semaphore,信號量是具有一些特殊屬性的計(jì)數(shù)器。

第一個(gè)屬性是計(jì)數(shù)的原子性,可以保證操作系統(tǒng)不會在計(jì)數(shù)器遞增或遞減的過程中交換線程。

內(nèi)部計(jì)數(shù)器在調(diào)用.release()時(shí)遞增,在調(diào)用.acquire()時(shí)遞減。

另一個(gè)特殊屬性是,如果線程在計(jì)數(shù)器為0時(shí)調(diào)用.acquire(),那么該線程將阻塞,直到另一個(gè)線程調(diào)用.release()并將計(jì)數(shù)器的值增加到1。

信號量通常用于保護(hù)容量有限的資源。例如,我們有一個(gè)連接池,并且希望限制該連接池中的元素?cái)?shù)量,就可以用信號量來進(jìn)行管理。

9.2 定時(shí)器

threading.Timer是一個(gè)定時(shí)器功能的類,指定函數(shù)在間隔特定時(shí)間后執(zhí)行任務(wù)。我們可以通過傳入需要等待的時(shí)間和函數(shù)來創(chuàng)建一個(gè)定時(shí)器:

t = threading.Timer(30.0, my_function)

調(diào)用.start()啟動定時(shí)器,函數(shù)將在指定時(shí)間過后的某個(gè)時(shí)間點(diǎn)上被新線程調(diào)用。但請注意,這里并不能保證函數(shù)會在我們所期望的確切時(shí)間被調(diào)用,可能會存在誤差。  

如果想要停止已經(jīng)啟動的定時(shí)器,可以調(diào)用.cancel()。在定時(shí)器觸發(fā)后調(diào)用.cancel()不會執(zhí)行任何操作,也不會產(chǎn)生異常。

定時(shí)器可用于在特定時(shí)間之后提示用戶執(zhí)行操作。如果用戶在定時(shí)器過時(shí)之前執(zhí)行了操作,可以調(diào)用.cancel()取消定時(shí)。

9.3 柵欄

threading模塊中的柵欄Barrier可以用來指定需要同步運(yùn)行的線程數(shù)量。創(chuàng)建柵欄Barrier時(shí),我們必須指定所需同步的線程數(shù)。每個(gè)線程都會在Barrier上調(diào)用.wait()方法,它們會先保持阻塞狀態(tài),直到等待的線程數(shù)量達(dá)到指定值時(shí),會被同時(shí)釋放。

注意,線程是由操作系統(tǒng)調(diào)度的,因此,即使所有線程同時(shí)被釋放,一次也只能運(yùn)行一個(gè)線程。

柵欄可以用來初始化一個(gè)線程池。讓線程初始化后在柵欄里等待,可以確保程序在所有線程都完成初始化后再開始運(yùn)行。

米哥點(diǎn)評

感謝Little monster同學(xué)的翻譯和整理。本篇對線程及多線程開發(fā)進(jìn)行了很好的詮釋,從基礎(chǔ)介紹到線程實(shí)例,從入門到進(jìn)階,全方位多角度講解有關(guān)線程開發(fā)方方面面的內(nèi)容,即便是工作了多年的老程序員,看完之后也是收獲頗多。多線程在數(shù)據(jù)采集和處理方面都有不少的應(yīng)用場景,相信對很多Tushare用戶會有所助益,寄以此篇促大家學(xué)好用好,提升數(shù)據(jù)效率。

    本站是提供個(gè)人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多