前言
一、同步隊列的結(jié)構與實現(xiàn)1、同步隊列的結(jié)構(1)結(jié)構介紹AQS使用的同步隊列是基于一種CLH鎖算法來實現(xiàn)(引用網(wǎng)上資料對CLH簡單介紹):
而在源碼中也有這樣的介紹: /** * Wait queue node class. * * <p>The wait queue is a variant of a "" (Craig, Landin, and * Hagersten) lock queue. CLH locks are normally used for * spinlocks. * ........... * <p>To enqueue into a CLH lock, you atomically splice it in as new * tail. To dequeue, you just set the head field. * <pre> * +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+ * </pre> * .............. 在AQS中的同步隊列結(jié)構以及獲取/釋放鎖都是基于此實現(xiàn)的,這里我們先放一個我畫的基本結(jié)構來理解AQS同步隊列,再進一步介紹一些細節(jié)。 根據(jù)以上圖我們看到:
在源碼中我們可以看到: // 內(nèi)部類Node節(jié)點static final class Node{...}// 同步隊列的head引用private transient volatile Node head;// 同步隊列的tail引用private transient volatile Node tail; (2)節(jié)點構成那么Node結(jié)構的具體構成是什么呢?我們具體看內(nèi)部類Node的源碼: static final class Node {/** Marker to indicate a node is waiting in shared mode */static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode */static final Node EXCLUSIVE = null;/** waitStatus value to indicate thread has cancelled */static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */static final int PROPAGATE = -3;/** 等待狀態(tài): * 0 INITAIL: 初始狀態(tài) * 1 CANCELLED: 由于等待超時或者被中斷,需要從同步隊列中取消等待,節(jié)點進入該狀態(tài)不會被改變 * -1 SIGNAL: 當前節(jié)點釋放同步狀態(tài)或被取消,則等待狀態(tài)的后繼節(jié)點被通知 * -2 CONDITION: 節(jié)點在等待隊列中,線程在Condition上,需要其它線程調(diào)用Condition的signal()方法才能從等待隊轉(zhuǎn)移到同步隊列 * -3 PROPAGATE: 表示下一個共享式同步狀態(tài)將會無條件被傳播下去 */volatile int waitStatus;/** 前驅(qū)結(jié)點 */volatile Node prev;/** 后繼節(jié)點 */volatile Node next; /** 獲取同步狀態(tài)的線程 */volatile Thread thread;/** 等待隊列中的后繼節(jié)點 */Node nextWaiter; /** 判斷Node是否是共享模式 */final boolean isShared() {return nextWaiter == SHARED; } /** 返回前驅(qū)結(jié)點 */final Node predecessor() throws NullPointerException { Node p = prev;if (p == null)throw new NullPointerException();elsereturn p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread; } } 從源碼中可以發(fā)現(xiàn):同步隊列中的節(jié)點Node用來保存獲取同步狀態(tài)失敗的線程引用、等待狀態(tài)以及前驅(qū)和后繼節(jié)點。 節(jié)點是構成同步隊列的基礎,沒有成功獲取同步狀態(tài)的線程將成為節(jié)點加入該隊列的尾部。當一個線程無法獲取同步狀態(tài)時,會被構造成節(jié)點并加入同步隊列中,通過CAS保證設置尾節(jié)點這一步是線程安全的,此時才能認為當前節(jié)點(線程)成功加入同步隊列與尾節(jié)點建立聯(lián)系。具體的實現(xiàn)邏輯請看下面介紹! 2、同步狀態(tài)獲取與釋放(1)獨占式同步狀態(tài)獲取與釋放通過調(diào)用同步器acquire(int arg)方法可以獲取同步狀態(tài),該方法中斷不敏感,也就是由于線程獲取同步狀態(tài)失敗后進入同步隊列中,后序線程對進行中斷操作時,線程不會從同步隊列中移出 public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 同步狀態(tài)獲取主要的流程步驟: 1)首先調(diào)用自定義同步器實現(xiàn)tryAcquire(int arg)方法,該方法保證線程安全的獲取同步狀態(tài)。 2)如果獲取失敗則構造同步節(jié)點(獨占式Node.EXCLUSIVE)并通過addWaiter(Node ndoe)方法將該節(jié)點加入到同步隊列的尾部,同時enq(node)通過for(;;)循環(huán)保證安全設置尾節(jié)點。 private Node addWaiter(Node mode) {// 根據(jù)給定模式構造NodeNode node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail; // 嘗試在尾部添加if (pred != null) { node.prev = pred;// cas方式保證正確添加尾節(jié)點if (compareAndSetTail(pred, node)) { pred.next = node;return node; } }// enq主要是通過for(;;)死循環(huán)來確保節(jié)點正確添加// 在for(;;)死循環(huán)中,通過cas將節(jié)點設置為尾節(jié)點時,才返回;否則一直嘗試設置 enq(node);return node; } private Node enq(final Node node) {for (;;) { Node t = tail;if (t == null) { // Must initialize 當tail節(jié)點為null時,必須初始化構造好 head節(jié)點if (compareAndSetHead(new Node())) tail = head; } else { // 否則就通過cas開始添加尾節(jié)點node.prev = t;if (compareAndSetTail(t, node)) { t.next = node;return t; } } } } 假設原隊列中存在Node-1到Node-4節(jié)點,此時某個線程獲取同步狀態(tài)失敗則構成成Node-5通過CAS方式加入隊列(下圖忽略自旋環(huán)節(jié))。 3)節(jié)點進入同步隊列之后“自旋”,即acquireQueued(final Node node, int arg)方法,在這個方法中,當前node死循環(huán)嘗試獲取鎖狀態(tài),但是只有node的前驅(qū)結(jié)點是Head才能嘗試獲取同步狀態(tài),獲取成功之后立即設置當前節(jié)點為Head,并成功返回。否則就會一直自旋。 final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();// 當前node節(jié)點的前驅(qū)是Head時(p == head),才能有資格去嘗試獲取同步狀態(tài)(tryAcquire(arg))// 這是因為當前節(jié)點的前驅(qū)結(jié)點獲得同步狀態(tài),才能喚醒后繼節(jié)點,即當前節(jié)點if (p == head && tryAcquire(arg)) { // 以上條件滿足之后setHead(node); // 設置當前節(jié)點為Headp.next = null; // help GC // 釋放ndoe的前驅(qū)節(jié)點failed = false;return interrupted; }// 線程被中斷或者前驅(qū)結(jié)點被釋放,則繼續(xù)進入檢查:p == head && tryAcquire(argif (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) interrupted = true; } } finally {if (failed) cancelAcquire(node); } } 此時新加入的Node-5節(jié)點也開始自旋,此時的Head(Node-1)已經(jīng)獲取到了同步狀態(tài),而Node-2退出了自旋,成為了新的Head。 文字總結(jié): 1)同步器會維護一個雙向FIFO隊列,獲取同步失敗的線程將會被構造成Node加入隊尾(并且做自旋檢查:檢查前驅(qū)結(jié)點是否是Head); 2)當前線程想要獲得同步狀態(tài),前提是其前驅(qū)結(jié)點是頭結(jié)點,并且獲得了同步狀態(tài); 3)當Head調(diào)用release(int arg)釋放鎖的同時會喚醒后繼節(jié)點(即當前節(jié)點),后繼節(jié)點結(jié)束自旋 流程圖總結(jié): 同步器的release方法:釋放鎖的同時,喚醒后繼節(jié)點(進而時后繼節(jié)點重新獲取同步狀態(tài)) public final boolean release(int arg) {if (tryRelease(arg)) { Node h = head;if (h != null && h.waitStatus != 0)// 該方法會喚醒Head節(jié)點的后繼節(jié)點,使其重試嘗試獲取同步狀態(tài) unparkSuccessor(h);return true; }return false; } UnparkSuccessor(Node node)方法使用LookSupport(LockSupport.unpark)喚醒處于等待狀態(tài)的線程(之后會慢慢看源碼介紹)。 (2)共享式同步狀態(tài)獲取與釋放共享鎖跟獨占式鎖最大的不同就是:某一時刻有多個線程同時獲取到同步狀態(tài),獲取判斷是否獲取同步狀態(tài)成功的關鍵,獲取到的同步狀態(tài)要大于等于0。而其他步驟基本都是一致的,還是從源碼開始分析起:帶后綴Share都為共享式同步方法。 1)acquireShared(int arg)獲取同步狀態(tài):如果獲取失敗則加入隊尾,并且檢查是否具備退出自旋的條件(前驅(qū)結(jié)點是頭結(jié)點并且能成功獲取同步狀態(tài)) public final void acquireShared(int arg) {// tryAcquireShared 獲取同步狀態(tài),大于0才是獲取狀態(tài)成功,否則就是失敗if (tryAcquireShared(arg) < 0)// 獲取狀態(tài)失敗則構造共享Node,加入隊列;// 并且檢查是否具備退出自旋的條件:即preNode為head,并且能獲取到同步狀態(tài) doAcquireShared(arg); } 2)doAcquireShared(arg):獲取失敗的Node加入隊列,如果當前節(jié)點的前驅(qū)結(jié)點是頭結(jié)點的話,嘗試獲取同步狀態(tài),如果大于等于0則在for(;;)中退出(退出自旋)。 private void doAcquireShared(int arg) {// 構造共享模式的Nodefinal Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);// 前驅(qū)節(jié)點是頭結(jié)點,并且能獲取狀態(tài)成功,則return返回,退出死循環(huán)(自旋)if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GCif (interrupted) selfInterrupt(); failed = false;return; } }if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) interrupted = true; } } finally {if (failed) cancelAcquire(node); } } 3)releaseShared(int arg):釋放同步狀態(tài),通過loop+CAS方式釋放多個線程的同步狀態(tài)。 public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {// 通過loop+CAS方式釋放多個線程的同步狀態(tài) doReleaseShared();return true; }return false; } 二、自定義同步組件(實現(xiàn)Lock,內(nèi)部類Sync繼承AQS)1、實現(xiàn)一個不可重入的互斥鎖Mutex 2、實現(xiàn)指定共享數(shù)量的共享鎖MyShareLock |
|