1、前言
之前好幾次看到有人在面經中提到了樂觀鎖與悲觀鎖,但是一本《Java Concurrency In Practice》快看完了都沒有見到過這兩種鎖,今天終于在第15章發(fā)現了它們的蹤跡。
15.2 Hardware support for concurrency
Exclusive locking is a pessimistic technique—it assumes the worst (if you don’t lock your door, gremlins will come in and rearrange your stuff) and doesn’t proceed until you can guarantee, by acquiring the appropriate locks, that other threads will not interfere.
For fine-grained operations, there is an alternate approach that is often more efficient—the optimistic approach, whereby you proceed with an update, hopeful that you can complete it without interference. This approach relies on collision detection to determine if there has been interference from other parties during the update, in which case the operation fails and can be retried (or not). The optimistic approach is like the old saying, “It is easier to obtain forgiveness than permission”, where “easier” here means “more efficient”.
原來樂觀鎖與悲觀鎖并不是特指某個鎖,而是在并發(fā)情況下保證數據完整性的不同策略。悲觀鎖指的就是我們平常使用的加鎖機制,它假設我們總是處于最壞的情況下,如果不加鎖數據完整性就會被破壞。而樂觀鎖指是一種基于沖突檢測的方法,檢測到沖突時操作就會失敗。
2、CAS機制介紹
CAS(Compare And Swap)是一種常見的“樂觀鎖”,大部分的CPU都有對應的匯編指令,它有三個操作數:內存地址V,舊值A,新值B。只有當前內存地址V上的值是A,B才會被寫到V上,否則操作失敗。
1 2 3 4 5 6 7 8 9 10 11 12 | public class SimulatedCAS {
private int value;
public synchronized int get() { return value; }
public synchronized int compareAndSwap( int expectedValue, int newValue) {
int oldValue = value;
if (oldValue == expectedValue)
value = newValue;
return oldValue;
}
}
|
上邊的類模擬了CAS操作,如果成員變量 value 的值與參數 expecredValue 的值不同,那就說明其他的線程已對其進行了修改,本次操作失敗。
接下來看一個使用CAS實現線程安全的計數器的例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | public class CasCounter {
private SimulatedCAS value;
public int getValue() {
return value.get();
}
public int increment() {
int v;
do {
v = value.get();
}
while (v != value.compareAndSwap(v, v + 1 ));
return v + 1 ;
}
}
|
在并發(fā)數不是特別高的情況下,使用CAS的樂觀鎖在性能上要優(yōu)于使用加鎖方式的悲觀鎖,因為大部分情況下經過數次輪詢后CAS操作都可以成功,而使用加鎖機制則會造成線程的阻塞與調度,相對而言更耗時。
Java從5.0開始引入了對CAS的支持,與之對應的是 java.util.concurrent.atomic 包下的AtomicInteger、AtomicReference等類,它們提供了基于CAS的讀寫操作和并發(fā)環(huán)境下的內存可見性。
3、非阻塞算法
通過CAS可以實現高效的支持并發(fā)訪問的數據結構,首先來看一個棧的實現。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | public class ConcurrentStack <E> {
AtomicReference<Node<E>> top = new AtomicReference<>();
public void push(E item) {
Node<E> newHead = new Node<E>(item);
Node<E> oldHead;
do {
oldHead = top.get();
newHead.next = oldHead;
} while (!top.compareAndSet(oldHead, newHead));
}
public E pop() {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = top.get();
if (oldHead == null )
return null ;
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));
return oldHead.item;
}
}
public class Node <E> {
public final E item;
public Node<E> next;
public Node(E item) {
this .item = item;
}
}
|
這個棧實際上是一個單鏈表結構,變量 top 代表頭節(jié)點。當 push() 被調用后,首先新建一個節(jié)點 newHead ,它的后繼節(jié)點應該為當前的頭節(jié)點。然后使用CAS操作嘗試將新節(jié)點賦值給 top ,如果 top 沒有發(fā)生變化,CAS操作成功。如果 top 已改變(可能是某個線程加入或移除了元素),CAS操作失敗,替換新節(jié)點的后繼節(jié)點為當前的頭節(jié)點,再次嘗試。無論CAS操作是否成功,鏈表的結構都不會被破壞。
現在為止我們已經看了兩個基于CAS操作的例子,一個是計數器另一個是棧,它們都有一個共同的特點,那就是它們的狀態(tài)只由一個變量決定,而且顯然CAS操作一個也只能更新一個變量。那么如何使用CAS實現更為復雜的數據結構呢?這里給出一個隊列的例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | public class Node <E> {
final E item;
final AtomicReference<Node<E>> next;
public Node(E item, Node<E> next) {
this .item = item;
this .next = new AtomicReference<Node<E>>(next);
}
}
public class LinkedQueue <E> {
private final Node<E> dummy = new Node<E>( null , null );
private final AtomicReference<Node<E>> head = new AtomicReference<Node<E>>(dummy);
private final AtomicReference<Node<E>> tail = new AtomicReference<Node<E>>(dummy);
public boolean put(E item) {
Node<E> newNode = new Node<E>(item, null );
while ( true ) {
Node<E> curTail = tail.get();
Node<E> tailNext = curTail.next.get();
if (curTail == tail.get()) {
if (tailNext != null ) {
// Queue in intermediate state, advance tail
tail.compareAndSet(curTail, tailNext);
} else {
// In quiescent state, try inserting new node
if (curTail.next.compareAndSet( null , newNode)) {
// Insertion succeeded, try advancing tail
tail.compareAndSet(curTail, newNode);
return true ;
}
}
}
}
}
}
|
為了維護隊列結構的有效性,我們必須要做到,如果線程A正在修改鏈表,線程B即將修改鏈表,線程B應該可以得知當前某個線程正對鏈表進行操作,線程B不可以立即對其進行修改。然后,如果線程B發(fā)現線程A正在修改鏈表,鏈表中應該含有足夠的信息使線程B能夠“幫助”線程A完成工作。線程B完成了線程A未完成的工作后,線程B就可以立即開始執(zhí)行自己的任務,而且線程A應該可以知道,線程B已經替自己完成了剩下的工作。這個算法實際是Michael-Scott nonblocking linked-queue algorithm,JDK中的ConcurrentLinkedQueue就是使用了這個算法。
現在來看一下具體的實現,首先表頭 head 跟表尾 tail 在初始化時被指向了一個名叫 dummy 的哨兵節(jié)點。如果鏈表正在被修改,指針狀態(tài)如下:

修改完成后指針狀態(tài)如下:

算法的關鍵之處就在于,如果鏈表正在被修改,那么 tail 指向的節(jié)點的 next 屬性是不為 null 的。那么任何線程只要發(fā)現 tail.next 不為 null ,它就可以斷定鏈表當前正在被另一個線程操作。而且更巧妙的是,這個線程可以通過將 tail.next 賦值給 tail 來幫助另一個線程完成工作。
附:
在上邊的隊列中,每個Node節(jié)點在初始化時都要新創(chuàng)建一個AtomicReference對象,我們還可以對其進行優(yōu)化:
1 2 3 4 5 6 7 8 9 | public class Node<E> {
private final E item;
private volatile Node<E> next;
public Node(E item) {
this .item = item;
}
}
private static AtomicReferenceFieldUpdater<Node, Node> nextUpdater = AtomicReferenceFieldUpdater.newUpdater(Node. class , Node. class , "next" );
|
通過AtomicReferenceFieldUpdater,我們可以用CAS的方式對 volatile 修飾變量進行修改,避免創(chuàng)建額外的對象。
參考:《Java Concurrency In Practice》
|