日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

AQS --- 融會貫通

 貪挽懶月 2022-06-20 發(fā)布于廣東

一、ReentrantLock 加鎖過程簡介

加鎖可以分為三個階段:

  • 嘗試加鎖;
  • 加鎖失敗,線程入AQS隊列;
  • 線程入隊列后進入阻塞狀態(tài)。

二、ReentrantLock 加鎖源碼分析

現(xiàn)有如下場景:

三個人去銀行的一個窗口辦理業(yè)務,一個窗口同一時刻只能接待一位顧客。抽象成代碼就是:

public static void main(String[] args){
 ReentrantLock lock = new ReentrantLock();
 new Thread(() -> {
  lock.lock();
  try {
   System.out.println("顧客A辦理業(yè)務");
   TimeUnit.MINUTES.sleep(5);
  } catch (Exception e){
   e.printStackTrace();
  } finally {
   lock.unlock();
  }
 }, "A").start();

 new Thread(() -> {
  lock.lock();
  try {
   System.out.println("顧客B辦理業(yè)務");
  } catch (Exception e){
   e.printStackTrace();
  } finally {
   lock.unlock();
  }
 }, "B").start();

 new Thread(() -> {
  lock.lock();
  try {
   System.out.println("顧客C辦理業(yè)務");
  } catch (Exception e){
   e.printStackTrace();
  } finally {
   lock.unlock();
  }
 }, "C").start();
}

1. lock 方法:

調用lock.lock()的時候,實際上調用的是 NonfairSync 的 lock 方法,如下:

final void lock() {
 if (compareAndSetState(0, 1))
  setExclusiveOwnerThread(Thread.currentThread());
 else
  acquire(1);
}

compareAndSetState(0, 1) 這里比較并交換,期望 AQS 中的 state 的值是0,如果是,就將其修改成1,返回 true ,否則返回 false 。

  • 返回 true 的時候,執(zhí)行 setExclusiveOwnerThread(Thread.currentThread()) 把占用資源的線程設置成當前線程。當線程A進來的時候,因為 AQS 中的 state 還沒別的線程去修改,所以是0,就會成功修改成 1,就直接加鎖成功了。

  • 返回 false 的時候,就會進入 esle 塊中,執(zhí)行 acquire(1) 方法。假如線程A加鎖成功還沒釋放鎖的時候,線程B進來了,那么就會返回 false 。 acquire(1) 方法又主要包括三個方法,源碼如下:

public final void acquire(int arg) {
 if (!tryAcquire(arg) &&
  acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  selfInterrupt();
}

接下來依次來看這三個方法。

2. tryAcquire(arg) 方法:

protected final boolean tryAcquire(int acquires) {
 return nonfairTryAcquire(acquires);
}

再點進去看 nonfairTryAcquire(acquires) :

final boolean nonfairTryAcquire(int acquires) {
 final Thread current = Thread.currentThread();
 int c = getState();
 if (c == 0) {
  if (compareAndSetState(0, acquires)) {
   setExclusiveOwnerThread(current);
   return true;
  }
 }
 else if (current == getExclusiveOwnerThread()) {
  int nextc = c + acquires;
  if (nextc < 0) // overflow
   throw new Error("Maximum lock count exceeded");
  setState(nextc);
  return true;
 }
 return false;
}

線程B進到這段代碼就有三種情況:

  • currentThread 就是線程B,c 就是 AQS 中的 state,即1,c 不等于0,所以跑到 else if 中,判斷當前線程和持有鎖的線程是否相同。current 是 B,而當前持有鎖的是 A,也不相等,所以直接 return false。

  • 如果線程B進來的時候 A 剛好走了,即 c 等于0,那么進到 if 里面。if 里面做的事就是再將 state 改成1,同時設置當前占有鎖的線程為 B,然后返回 true;

  • 如果當前線程等于當前占有鎖的線程,即進來的還是線程A,那么就修改 state 的值(當前值加1),然后返回 true。

3. addWaiter(Node.EXCLUSIVE) 方法:

上面說了 tryAcquire(arg) 方法,當該方法返回 false 的時候,就會執(zhí)行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法,那么先看它里面的 addWaiter(Node.EXCLUSIVE) 方法,源碼如下:

private Node addWaiter(Node mode) {
 Node node = new Node(Thread.currentThread(), mode);
 // Try the fast path of enq; backup to full enq on failure
 Node pred = tail;
 if (pred != null) {
  node.prev = pred;
  if (compareAndSetTail(pred, node)) {
   pred.next = node;
   return node;
  }
 }
 enq(node);
 return node;
}

無特殊說明的時候,以下情況基于:當前持有鎖的是線程A,并且還沒釋放,進來的是線程B。

這個方法首先將線程B封裝成 Node,傳進來的 mode 是 AQS 中的一個屬性,還沒哪里賦過值,所以是 null,當前的 tail 其實也是 null,因為 AQS 隊列中現(xiàn)在還沒別的等待線程。是 null 的話,就執(zhí)行入隊方法 enq(node),該方法如下:

private Node enq(final Node node) {
 for (;;) {
  Node t = tail;
  if (t == null) { // Must initialize
   if (compareAndSetHead(new Node()))
    tail = head;
  } else {
   node.prev = t;
   if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;
   }
  }
 }
}

for (; ;) 其實就是相當于 while(true),進行自旋。當前的 tail 是 null ,所以進入 if 中,這里有個 compareAndSetHead(new Node()) 方法,這里是 new 了一個節(jié)點,姑且叫它傀儡節(jié)點,將它設置為頭結點,如果 new 成功了,尾結點也指向它。效果如下圖:

傀儡節(jié)點

第二次循環(huán)的時候,t 就是不再是 null 了,而是指向剛才那個傀儡節(jié)點了,所以進入 else 中。else 中做的事就是,將傳進來的節(jié)點,即封裝了線程B的節(jié)點 node,將其 prev 設置成剛才new 的那個傀儡節(jié)點,再將 tail 指向 封裝了線程B的 node;再將傀儡節(jié)點的 next 指針指向封裝了線程B的 node,如下圖:

node入隊

當線程C進到 addWaiter(Node mode) 方法,此時 tail 不是 null 了,已經指向了 線程B的節(jié)點,所以進入到 if 塊里面,執(zhí)行:

if (pred != null) {
 node.prev = pred;
 if (compareAndSetTail(pred, node)) {
  pred.next = node;
  return node;
 }
}

這里就是將線程C的 prev 設置為當前的 tail,即線程B的 node,然后將線程C設置為 tail,再將線程B的 next 設置為線程C。最后效果圖如下:

線程C入隊

4. acquireQueued(final Node node, int arg) 方法:

再來看看這個方法:

final boolean acquireQueued(final Node node, int arg) {
 boolean failed = true;
 try {
  boolean interrupted = false;
  for (;;) {
   final Node p = node.predecessor();
   if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
   }
   if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
  }
 } finally {
  if (failed)
   cancelAcquire(node);
 }
}

這個固定的,假如傳進來的 node 是線程B,首先會進入自旋,看看 predecessor 這方法:

final Node predecessor() throws NullPointerException {
 Node p = prev;
 if (p == null)
  throw new NullPointerException();
 else
  return p;
}

首先讓 p 等于 prev,此時線程B 節(jié)點的 prev是誰呢,直接看我上面畫的圖可以知道,線程B的 prev 就是傀儡節(jié)點,所以這里 return 的就是傀儡節(jié)點。

回到外層,傀儡節(jié)點等于 head,所以又會執(zhí)行 tryAcquire(arg),即又重復上述步驟再次嘗試獲取鎖。因為此時線程A持有鎖,所以線程B嘗試獲取鎖會失敗,即 tryAcquire(arg) 會返回 false。

返回 false, 那就執(zhí)行下一個 if。首先看 shouldParkAfterFailedAcquire(p, node) 方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 int ws = pred.waitStatus;
 if (ws == Node.SIGNAL)
  /*
   * This node has already set status asking a release
   * to signal it, so it can safely park.
   */
  return true;
 if (ws > 0) {
  /*
   * Predecessor was cancelled. Skip over predecessors and
   * indicate retry.
   */
  do {
   node.prev = pred = pred.prev;
  } while (pred.waitStatus > 0);
  pred.next = node;
 } else {
  /*
   * waitStatus must be 0 or PROPAGATE.  Indicate that we
   * need a signal, but don't park yet.  Caller will need to
   * retry to make sure it cannot acquire before parking.
   */
  compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
 }
 return false;
}

prev 是傀儡節(jié)點,它的 waitStatus 是0,因為傀儡節(jié)點 new 出來以后還沒改過它的 waitStatus 的值,默認是0。Node.SIGNAL 的值是 -1,不相等,0 也不大于 0,所以進入 else,執(zhí)行 compareAndSetWaitStatus(pred, ws, Node.SIGNAL),這是比較并交換,將傀儡節(jié)點的 waitStatus 的值由 0 改為 -1,最后返回了 false。

shouldParkAfterFailedAcquire(p, node) 方法返回了 false,因為自旋,所以又回到 final Node p = node.predecessor(); 這一行。此時 p 節(jié)點還是傀儡節(jié)點,再去嘗試獲取鎖,如果線程A還是釋放,又獲取失敗了,就會再次執(zhí)行 shouldParkAfterFailedAcquire(p, node) 方法。

再次執(zhí)行 shouldParkAfterFailedAcquire(p, node) 的時候,傀儡節(jié)點的 waitStatus 就已經是 -1 了,所以直接 return true。

這里 return 了 true,就會執(zhí)行 parkAndCheckInterrupt() 方法,看看這個方法源碼:

private final boolean parkAndCheckInterrupt() {
 LockSupport.park(this);
 return Thread.interrupted();
}

這里的 this 就是線程B,這里調用了 park 方法,就讓線程B 在等著了。線程C進來也一樣,執(zhí)行到這一步,就會調用 park 方法,一直在等著。當線程A釋放鎖了,就會調用 unpark 方法,線程B和線程C就有一個可以搶到鎖了。

5. unlock 方法:

當線程A調用了 unlock 方法,實際上調用的是:

 public void unlock() {
 sync.release(1);
}

點進去之后是這樣的:

public final boolean release(int arg) {
 if (tryRelease(arg)) {
  Node h = head;
  if (h != null && h.waitStatus != 0)
   unparkSuccessor(h);
  return true;
 }
 return false;
}

再點進去看 tryRelease(arg 方法:

 protected final boolean tryRelease(int releases) {
 int c = getState() - releases;
 if (Thread.currentThread() != getExclusiveOwnerThread())
  throw new IllegalMonitorStateException();
 boolean free = false;
 if (c == 0) {
  free = true;
  setExclusiveOwnerThread(null);
 }
 setState(c);
 return free;
}

線程A getState 是 1,傳進來的 releases 也是 1,所以相減結果就是 0。因為結果是 0,所以會將 free 改成 true,然后調用 setExclusiveOwnerThread(null); 將當前持有鎖的線程設置為 null。然后設置 AQS 的 state 等于 c,即等于 0,最后該方法 return true。

回到上一層,此時的 head 是傀儡節(jié)點,不為空,并且傀儡節(jié)點的 waitStatus 剛才改成了 -1,不等于 0,所以會調用 unparkSuccessor(h); 方法:

private void unparkSuccessor(Node node) {
 /*
  * If status is negative (i.e., possibly needing signal) try
  * to clear in anticipation of signalling.  It is OK if this
  * fails or if status is changed by waiting thread.
  */
 int ws = node.waitStatus;
 if (ws < 0)
  compareAndSetWaitStatus(node, ws, 0);

 /*
  * Thread to unpark is held in successor, which is normally
  * just the next node.  But if cancelled or apparently null,
  * traverse backwards from tail to find the actual
  * non-cancelled successor.
  */
 Node s = node.next;
 if (s == null || s.waitStatus > 0) {
  s = null;
  for (Node t = tail; t != null && t != node; t = t.prev)
   if (t.waitStatus <= 0)
    s = t;
 }
 if (s != null)
  LockSupport.unpark(s.thread);
}

這里傳進來的節(jié)點是傀儡節(jié)點,它的 waitStatus 是 -1,小于 0,所以就會執(zhí)行 compareAndSetWaitStatus(node, ws, 0) 進行比較并交換,又將傀儡節(jié)點的 waitStatus 改成了 0。

繼續(xù)往下執(zhí)行,得到的 s 就是線程B所在節(jié)點,不為空,并且線程B節(jié)點的 waitStatus 還沒改過,是 0,所以直接執(zhí)行 LockSupport.unpark(s.thread)。

因為調用了 unpark,所以剛才阻塞的線程B在 acquireQueued(final Node node, int arg) 方法中的自旋就繼續(xù)進行,就會調用 tryAcquire(arg) 方法,這次因為A已經釋放鎖了,所以該方法會返回 true,就會執(zhí)行 if 里面的代碼:

if (p == head && tryAcquire(arg)) {
 setHead(node);
 p.next = null; // help GC
 failed = false;
 return interrupted;
}

看看 setHead(node) 方法:

private void setHead(Node node) {
 head = node;
 node.thread = null;
 node.prev = null;
}

這里的 node 就是線程B節(jié)點,將頭結點指向線程B節(jié)點,將線程B節(jié)點的線程設置為空,前驅設置為空。外層再把傀儡節(jié)點的 next 指針設置為空,所以最終效果就是:

傀儡節(jié)點出隊

最終是傀儡節(jié)點出隊,以前線程B所在節(jié)點成為新的傀儡節(jié)點。因為之前的傀儡節(jié)點已經沒有任何引用指向它了,就會被 GC 回收。


掃描二維碼

    轉藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多