概述本文主要分析JCU包中讀寫鎖接口(ReadWriteLock )的重要實現(xiàn)類ReentrantReadWriteLock 。主要實現(xiàn)讀共享,寫互斥功能,對比單純的互斥鎖在共享資源使用場景為頻繁讀取及少量修改的情況下可以較好的提高性能。 ReadWriteLock接口簡單說明ReadWriteLock接口只定義了兩個方法: public interface ReadWriteLock { /** * Returns the lock used for reading. * * @return the lock used for reading */ Lock readLock(); /** * Returns the lock used for writing. * * @return the lock used for writing */ Lock writeLock();}
通過調(diào)用相應(yīng)方法獲取讀鎖或?qū)戞i,獲取的讀鎖及寫鎖都是Lock 接口的實現(xiàn),可以如同使用Lock 接口一樣使用(其實也有一些特性是不支持的)。 ReentrantReadWriteLock使用示例讀寫鎖的使用并不復(fù)雜,可以參考以下使用示例: class RWDictionary { private final Map<String, Data> m = new TreeMap<String, Data>(); private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); public Data get(String key) { r.lock(); try { return m.get(key); } finally { r.unlock(); } } public String[] allKeys() { r.lock(); try { return m.keySet().toArray(); } finally { r.unlock(); } } public Data put(String key, Data value) { w.lock(); try { return m.put(key, value); } finally { w.unlock(); } } public void clear() { w.lock(); try { m.clear(); } finally { w.unlock(); } } }
與普通重入鎖使用的主要區(qū)別在于需要使用不同的鎖對象引用讀寫鎖,并且在讀寫時分別調(diào)用對應(yīng)的鎖。 ReentrantReadWriteLock鎖實現(xiàn)分析本節(jié)通過學習源碼分析可重入讀寫鎖的實現(xiàn)。 圖解重要函數(shù)及對象關(guān)系根據(jù)示例代碼可以發(fā)現(xiàn),讀寫鎖需要關(guān)注的重點函數(shù)為獲取讀鎖及寫鎖的函數(shù),對于讀鎖及寫鎖對象則主要關(guān)注加鎖和解鎖函數(shù),這幾個函數(shù)及對象關(guān)系如下圖: 從圖中可見讀寫鎖的加鎖解鎖操作最終都是調(diào)用ReentrantReadWriteLock 類的內(nèi)部類Sync 提供的方法。與{% post_link 細談重入鎖ReentrantLock %}一文中描述相似,Sync 對象通過繼承AbstractQueuedSynchronizer 進行實現(xiàn),故后續(xù)分析主要基于Sync 類進行。 讀寫鎖Sync 結(jié)構(gòu)分析Sync 繼承于AbstractQueuedSynchronizer ,其中主要功能均在AbstractQueuedSynchronizer 中完成,其中最重要功能為控制線程獲取鎖失敗后轉(zhuǎn)換為等待狀態(tài)及在滿足一定條件后喚醒等待狀態(tài)的線程。先對AbstractQueuedSynchronizer 進行觀察。
AbstractQueuedSynchronizer 圖解
為了更好理解AbstractQueuedSynchronizer 的運行機制,可以首先研究其內(nèi)部數(shù)據(jù)結(jié)構(gòu),如下圖: 圖中展示AQS類較為重要的數(shù)據(jù)結(jié)構(gòu),包括int 類型變量state 用于記錄鎖的狀態(tài),繼承自AbstractOwnableSynchronizer 類的Thread 類型變量exclusiveOwnerThread 用于指向當前排他的獲取鎖的線程,AbstractQueuedSynchronizer.Node 類型的變量head 及tail 。 其中Node 對象表示當前等待鎖的節(jié)點,Node 中thread 變量指向等待的線程,waitStatus 表示當前等待節(jié)點狀態(tài),mode 為節(jié)點類型。多個節(jié)點之間使用prev 及next 組成雙向鏈表,參考CLH鎖隊列的方式進行鎖的獲取,但其中與CLH隊列的重要區(qū)別在于CLH隊列中后續(xù)節(jié)點需要自旋輪詢前節(jié)點狀態(tài)以確定前置節(jié)點是否已經(jīng)釋放鎖,期間不釋放CPU資源,而AQS 中Node 節(jié)點指向的線程在獲取鎖失敗后調(diào)用LockSupport.park 函數(shù)使其進入阻塞狀態(tài),讓出CPU資源,故在前置節(jié)點釋放鎖時需要調(diào)用unparkSuccessor 函數(shù)喚醒后繼節(jié)點。 根據(jù)以上說明可得知此上圖圖主要表現(xiàn)當前thread0 線程獲取了鎖,thread1 線程正在等待。 讀寫鎖Sync 對于AQS 使用讀寫鎖中Sync 類是繼承于AQS ,并且主要使用上文介紹的數(shù)據(jù)結(jié)構(gòu)中的state 及waitStatus 變量進行實現(xiàn)。 實現(xiàn)讀寫鎖與實現(xiàn)普通互斥鎖的主要區(qū)別在于需要分別記錄讀鎖狀態(tài)及寫鎖狀態(tài),并且等待隊列中需要區(qū)別處理兩種加鎖操作。
Sync 使用state 變量同時記錄讀鎖與寫鎖狀態(tài),將int 類型的state 變量分為高16位與第16位,高16位記錄讀鎖狀態(tài),低16位記錄寫鎖狀態(tài),如下圖所示:
Sync 使用不同的mode 描述等待隊列中的節(jié)點以區(qū)分讀鎖等待節(jié)點和寫鎖等待節(jié)點。mode 取值包括SHARED 及EXCLUSIVE 兩種,分別代表當前等待節(jié)點為讀鎖和寫鎖。 讀寫鎖Sync 代碼過程分析寫鎖加鎖通過對于重要函數(shù)關(guān)系的分析,寫鎖加鎖最終調(diào)用Sync 類的acquire 函數(shù)(繼承自AQS ) public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
現(xiàn)在分情況圖解分析 無鎖狀態(tài)無鎖狀態(tài)AQS 內(nèi)部數(shù)據(jù)結(jié)構(gòu)如下圖所示: 其中state 變量為0,表示高位地位地位均為0,沒有任何鎖,且等待節(jié)點的首尾均指向空(此處特指head節(jié)點沒有初始化時),鎖的所有者線程也為空。 在無鎖狀態(tài)進行加鎖操作,線程調(diào)用acquire 函數(shù),首先使用tryAcquire 函數(shù)判斷鎖是否可獲取成功,由于當前是無鎖狀態(tài)必然成功獲取鎖(如果多個線程同時進入此函數(shù),則有且只有一個線程可調(diào)用compareAndSetState 成功,其他線程轉(zhuǎn)入獲取鎖失敗的流程)。獲取鎖成功后AQS 狀態(tài)為: 有鎖狀態(tài)在加寫鎖時如果當前AQS 已經(jīng)是有鎖狀態(tài),則需要進一步處理。有鎖狀態(tài)主要分為已有寫鎖和已有讀鎖狀態(tài),并且根據(jù)最終當前線程是否可直接獲取鎖分為兩種情況: - 非重入:如果滿足一下兩個條件之一,當前線程必須加入等待隊列(暫不考慮非公平鎖搶占情況)
a. 已有讀鎖; b. 有寫鎖且獲取寫鎖的線程不為當前請求鎖的線程。 - 重入:有寫鎖且當前獲取寫鎖的線程與當前請求鎖的線程為同一線程,則直接獲取鎖并將寫鎖狀態(tài)值加1。
寫鎖重入狀態(tài)如圖: 寫鎖非重入等待狀態(tài)如圖: 在非重入狀態(tài),當前線程創(chuàng)建等待節(jié)點追加到等待隊列隊尾,如果當前頭結(jié)點為空,則需要創(chuàng)建一個默認的頭結(jié)點。 之后再當前獲取鎖的線程釋放鎖后,會喚醒等待中的節(jié)點,即為thread1 。如果當前等待隊列存在多個等待節(jié)點,由于thread1 等待節(jié)點為EXCLUSIVE 模式,則只會喚醒當前一個節(jié)點,不會傳播喚醒信號。 讀鎖加鎖通過對于重要函數(shù)關(guān)系的分析,寫鎖加鎖最終調(diào)用Sync 類的acquireShared 函數(shù)(繼承自AQS ): public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
同上文,現(xiàn)在分情況圖解分析 無鎖狀態(tài)無所狀態(tài)AQS 內(nèi)部數(shù)據(jù)狀態(tài)圖與寫加鎖是無鎖狀態(tài)一致: 在無鎖狀態(tài)進行加鎖操作,線程調(diào)用acquireShared 函數(shù),首先使用tryAcquireShared 函數(shù)判斷共享鎖是否可獲取成功,由于當前為無鎖狀態(tài)則獲取鎖一定成功(如果同時多個線程在讀鎖進行競爭,則只有一個線程能夠直接獲取讀鎖,其他線程需要進入fullTryAcquireShared 函數(shù)繼續(xù)進行鎖的獲取,該函數(shù)在后文說明)。當前線程獲取讀鎖成功后,AQS 內(nèi)部結(jié)構(gòu)如圖所示: 其中有兩個新的變量:firstReader 及firstReaderHoldCount 。firstReader 指向在無鎖狀態(tài)下第一個獲取讀鎖的線程,firstReaderHoldCount 記錄第一個獲取讀鎖的線程持有當前鎖的計數(shù)(主要用于重入)。 有鎖狀態(tài)無鎖狀態(tài)獲取讀鎖比較簡單,在有鎖狀態(tài)則需要分情況討論。其中需要分當前被持有的鎖是讀鎖還是寫鎖,并且每種情況需要區(qū)分等待隊列中是否有等待節(jié)點。 已有讀鎖且等待隊列為空此狀態(tài)比較簡單,圖示如: 此時線程申請讀鎖,首先調(diào)用readerShouldBlock 函數(shù)進行判斷,該函數(shù)根據(jù)當前鎖是否為公平鎖判斷規(guī)則稍有不同。如果為非公平鎖,則只需要當前第一個等待節(jié)點不是寫鎖就可以嘗試獲取鎖(考慮第一點為寫鎖主要為了方式寫鎖“餓死”);如果是公平鎖則只要有等待節(jié)點且當前鎖不為重入就需要等待。 由于本節(jié)的前提是等待隊列為空的情況,故readerShouldBlock 函數(shù)一定返回false ,則當前線程使用CAS 對讀鎖計數(shù)進行增加(同上文,如果同時多個線程在讀鎖進行競爭,則只有一個線程能夠直接獲取讀鎖,其他線程需要進入fullTryAcquireShared 函數(shù)繼續(xù)進行鎖的獲?。?。 在成功對讀鎖計數(shù)器進行增加后,當前線程需要繼續(xù)對當前線程持有讀鎖的計數(shù)進行增加。此時分為兩種情況: - 當前線程是第一個獲取讀鎖的線程,此時由于第一個獲取讀鎖的線程已經(jīng)通過
firstReader 及firstReaderHoldCount 兩個變量進行存儲,則僅僅需要將firstReaderHoldCount 加1即可; - 當前線程不是第一個獲取讀鎖的線程,則需要使用
readHolds 進行存儲,readHolds 是ThreadLocal 的子類,通過readHolds 可獲取當前線程對應(yīng)的HoldCounter 類的對象,該對象保存了當前線程獲取讀鎖的計數(shù)??紤]程序的局部性原理,又使用cachedHoldCounter 緩存最近使用的HoldCounter 類的對象,如在一段時間內(nèi)只有一個線程請求讀鎖則可加速對讀鎖獲取的計數(shù)。
第一個讀鎖線程重入如圖: 非首節(jié)點獲取讀鎖 根據(jù)上圖所示,thread0 為首節(jié)點,thread1 線程繼續(xù)申請讀鎖,獲取成功后使用ThreadLocal 鏈接的方式進行存儲計數(shù)對象,并且由于其為最近獲取讀鎖的線程,則cachedHoldCounter 對象設(shè)置指向thread1 對應(yīng)的計數(shù)對象。 已有讀鎖且等待隊列不為空在當前鎖已經(jīng)被讀鎖獲取,且等待隊列不為空的情況下 ,可知等待隊列的頭結(jié)點一定為寫鎖獲取等待,這是由于在讀寫鎖實現(xiàn)過程中,如果某線程獲取了讀鎖,則會喚醒當前等到節(jié)點之后的所有等待模式為SHARED 的節(jié)點,直到隊尾或遇到EXCLUSIVE 模式的等待節(jié)點(具體實現(xiàn)函數(shù)為setHeadAndPropagate 后續(xù)還會遇到)。所以可以確定當前為讀鎖狀態(tài)其有等待節(jié)點情況下,首節(jié)點一定是寫鎖等待。如圖所示: 上圖展示當前thread0 與thread1 線程獲取讀鎖,thread0 為首個獲取讀鎖的節(jié)點,并且thread2 線程在等待獲取寫鎖。 在上圖顯示的狀態(tài)下,無論公平鎖還是非公平鎖的實現(xiàn),新的讀鎖加鎖一定會進行排隊,添加等待節(jié)點在寫鎖等待節(jié)點之后,這樣可以防止寫操作的餓死。申請讀鎖后的狀態(tài)如圖所示: 如圖所示,在當前鎖被為讀鎖且有等待隊列情況下,thread3 及thread4 線程申請讀鎖,則被封裝為等待節(jié)點追加到當前等待隊列后,節(jié)點模式為SHARED ,線程使用LockSupport.park 函數(shù)進入阻塞狀態(tài),讓出CPU資源,直到前驅(qū)的等待節(jié)點完成鎖的獲取和釋放后進行喚醒。 已有寫鎖被獲取當前線程申請讀鎖時發(fā)現(xiàn)寫鎖已經(jīng)被獲取,則無論等待隊列是否為空,線程一定會需要加入等待隊列(注意在非公平鎖實現(xiàn)且前序沒有寫鎖申請的等待,線程有機會搶占獲取鎖而不進入等待隊列)。寫鎖被獲取的情況下,AQS 狀態(tài)為如下狀態(tài) 在兩種情況下,讀鎖獲取都會進入等待隊列等待前序節(jié)點喚醒,這里不再贅述。 讀等待節(jié)點被喚醒讀寫鎖與單純的排他鎖主要區(qū)別在于讀鎖的共享性,在讀寫鎖實現(xiàn)中保證讀鎖能夠共享的其中一個機制就在于,如果一個讀鎖等待節(jié)點被喚醒后其會繼續(xù)喚醒拍在當前喚醒節(jié)點之后的SHARED 模式等待節(jié)點。查看源碼: private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { //注意看這里 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
在for 循環(huán)中,線程如果獲取讀鎖成功后,需要調(diào)用setHeadAndPropagate 方法。查看其源碼: private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
在滿足傳播條件情況下,獲取讀鎖后繼續(xù)喚醒后續(xù)節(jié)點,所以如果當前鎖是讀鎖狀態(tài)則等待節(jié)點第一個節(jié)點一定是寫鎖等待節(jié)點。 鎖降級鎖降級算是獲取讀鎖的特例,如在t0 線程已經(jīng)獲取寫鎖的情況下,再調(diào)取讀鎖加鎖函數(shù)則可以直接獲取讀鎖,但此時其他線程仍然無法獲取讀鎖或?qū)戞i,在t0 線程釋放寫鎖后,如果有節(jié)點等待則會喚醒后續(xù)節(jié)點,后續(xù)節(jié)點可見的狀態(tài)為目前有t0 線程獲取了讀鎖。 所降級有什么應(yīng)用場景呢?引用讀寫鎖中使用示例代碼 class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // Must release read lock before acquiring write lock rwl.readLock().unlock(); rwl.writeLock().lock(); try { // Recheck state because another thread might have // acquired write lock and changed state before we did. if (!cacheValid) { data = ... cacheValid = true; } // Downgrade by acquiring read lock before releasing write lock rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); // Unlock write, still hold read } } try { use(data); } finally { rwl.readLock().unlock(); } } }
其中針對變量cacheValid 的使用主要過程為加讀鎖、讀取、釋放讀鎖、加寫鎖、修改值、加讀鎖、釋放寫鎖、使用數(shù)據(jù)、釋放讀鎖。其中后續(xù)幾步(加寫鎖、修改值、加讀鎖、釋放寫鎖、使用數(shù)據(jù)、釋放讀鎖)為典型的鎖降級。如果不使用鎖降級,則過程可能有三種情況: - 第一種:加寫鎖、修改值、釋放寫鎖、使用數(shù)據(jù),即使用寫鎖修改數(shù)據(jù)后直接使用剛修改的數(shù)據(jù),這樣可能有數(shù)據(jù)的不一致,如當前線程釋放寫鎖的同時其他線程(如
t0 )獲取寫鎖準備修改(還沒有改)cacheValid 變量,而當前線程卻繼續(xù)運行,則當前線程讀到的cacheValid 變量的值為t0 修改前的老數(shù)據(jù); - 第二種:加寫鎖、修改值、使用數(shù)據(jù)、釋放寫鎖,即將修改數(shù)據(jù)與再次使用數(shù)據(jù)合二為一,這樣不會有數(shù)據(jù)的不一致,但是由于混用了讀寫兩個過程,以排它鎖的方式使用讀寫鎖,減弱了讀寫鎖讀共享的優(yōu)勢,增加了寫鎖(獨占鎖)的占用時間;
- 第三種:加寫鎖、修改值、釋放寫鎖、加讀鎖、使用數(shù)據(jù)、釋放讀鎖,即使用寫鎖修改數(shù)據(jù)后再請求讀鎖來使用數(shù)據(jù),這是時數(shù)據(jù)的一致性是可以得到保證的,但是由于釋放寫鎖和獲取讀鎖之間存在時間差,則當前想成可能會需要進入等待隊列進行等待,可能造成線程的阻塞降低吞吐量。
因此針對以上情況提供了鎖的降級功能,可以在完成數(shù)據(jù)修改后盡快讀取最新的值,且能夠減少寫鎖占用時間。 最后注意,讀寫鎖不支持鎖升級,即獲取讀鎖、讀數(shù)據(jù)、獲取寫鎖、釋放讀鎖、釋放寫鎖這個過程,因為讀鎖為共享鎖,如同時有多個線程獲取了讀鎖后有一個線程進行鎖升級獲取了寫鎖,這會造成同時有讀鎖(其他線程)和寫鎖的情況,造成其他線程可能無法感知新修改的數(shù)據(jù)(此為邏輯性錯誤),并且在JAVA讀寫鎖實現(xiàn)上由于當前線程獲取了讀鎖,再次請求寫鎖時必然會阻塞而導(dǎo)致后續(xù)釋放讀鎖的方法無法執(zhí)行,這回造成死鎖(此為功能性錯誤)。 寫鎖釋放鎖過程了解了加鎖過程后解鎖過程就非常簡單,每次調(diào)用解鎖方法都會減少重入計數(shù)次數(shù),直到減為0則喚醒后續(xù)第一個等待節(jié)點,如喚醒的后續(xù)節(jié)點為讀等待節(jié)點,則后續(xù)節(jié)點會繼續(xù)傳播喚醒狀態(tài)。 讀鎖釋放過程讀鎖釋放過比寫鎖稍微復(fù)雜,因為是共享鎖,所以可能會有多個線程同時獲取讀鎖,故在解鎖時需要做兩件事: - 獲取當前線程對應(yīng)的重入計數(shù),并進行減1,此處天生為線程安全的,不需要特殊處理;
- 當前讀鎖獲取次數(shù)減1,此處由于可能存在多線程競爭,故使用自旋
CAS 進行設(shè)置。
完成以上兩步后,如讀狀態(tài)為0,則喚醒后續(xù)等待節(jié)點。 總結(jié)根據(jù)以上分析,本文主要展示了讀寫鎖的場景及方式,并分析讀寫鎖核心功能(加解鎖)的代碼實現(xiàn)。Java讀寫鎖同時附帶了更多其他方法,包括鎖狀態(tài)監(jiān)控和帶超時機制的加鎖方法等,本文不在贅述。并且讀寫鎖中寫鎖可使用Conditon 機制也不在詳細說明。
|