日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

探索JAVA并發(fā) - 并發(fā)容器全家福!

 鷹兔牛熊眼 2019-08-26

作者:acupt,專注Java,架構(gòu)師社區(qū)合伙人!

14個并發(fā)容器,你用過幾個?

不考慮多線程并發(fā)的情況下,容器類一般使用ArrayList、HashMap等線程不安全的類,效率更高。在并發(fā)場景下,常會用到ConcurrentHashMap、ArrayBlockingQueue等線程安全的容器類,雖然犧牲了一些效率,但卻得到了安全。

上面提到的線程安全容器都在java.util.concurrent包下,這個包下并發(fā)容器不少,今天全部翻出來鼓搗一下。

僅做簡單介紹,后續(xù)再分別深入探索。

簡介

  • ConcurrentHashMap:并發(fā)版HashMap

  • CopyOnWriteArrayList:并發(fā)版ArrayList

  • CopyOnWriteArraySet:并發(fā)Set

  • ConcurrentLinkedQueue:并發(fā)隊列(基于鏈表)

  • ConcurrentLinkedDeque:并發(fā)隊列(基于雙向鏈表)

  • ConcurrentSkipListMap:基于跳表的并發(fā)Map

  • ConcurrentSkipListSet:基于跳表的并發(fā)Set

  • ArrayBlockingQueue:阻塞隊列(基于數(shù)組)

  • LinkedBlockingQueue:阻塞隊列(基于鏈表)

  • LinkedBlockingDeque:阻塞隊列(基于雙向鏈表)

  • PriorityBlockingQueue:線程安全的優(yōu)先隊列

  • SynchronousQueue:讀寫成對的隊列

  • LinkedTransferQueue:基于鏈表的數(shù)據(jù)交換隊列

  • DelayQueue:延時隊列

ConcurrentHashMap 并發(fā)版HashMap

最常見的并發(fā)容器之一,可以用作并發(fā)場景下的緩存。底層依然是哈希表,但在JAVA 8中有了不小的改變,而JAVA 7和JAVA 8都是用的比較多的版本,因此經(jīng)常會將這兩個版本的實現(xiàn)方式做一些比較(比如面試中)。

一個比較大的差異就是,JAVA 7中采用分段鎖來減少鎖的競爭,JAVA 8中放棄了分段鎖,采用CAS(一種樂觀鎖),同時為了防止哈希沖突嚴(yán)重時退化成鏈表(沖突時會在該位置生成一個鏈表,哈希值相同的對象就鏈在一起),會在鏈表長度達(dá)到閾值(8)后轉(zhuǎn)換成紅黑樹(比起鏈表,樹的查詢效率更穩(wěn)定)。

CopyOnWriteArrayList 并發(fā)版ArrayList

并發(fā)版ArrayList,底層結(jié)構(gòu)也是數(shù)組,和ArrayList不同之處在于:當(dāng)新增和刪除元素時會創(chuàng)建一個新的數(shù)組,在新的數(shù)組中增加或者排除指定對象,最后用新增數(shù)組替換原來的數(shù)組。

適用場景:由于讀操作不加鎖,寫(增、刪、改)操作加鎖,因此適用于讀多寫少的場景。

局限:由于讀的時候不會加鎖(讀的效率高,就和普通ArrayList一樣),讀取的當(dāng)前副本,因此可能讀取到臟數(shù)據(jù)。如果介意,建議不用。

看看源碼感受下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
final transient ReentrantLock lock = new ReentrantLock();
private transient volatile Object[] array;

// 添加元素,有鎖
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); // 修改時加鎖,保證并發(fā)安全
try {
Object[] elements = getArray(); // 當(dāng)前數(shù)組
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); // 創(chuàng)建一個新數(shù)組,比老的大一個空間
newElements[len] = e; // 要添加的元素放進(jìn)新數(shù)組
setArray(newElements); // 用新數(shù)組替換原來的數(shù)組
return true;
} finally {
lock.unlock(); // 解鎖
}
}

// 讀元素,不加鎖,因此可能讀取到舊數(shù)據(jù)
public E get(int index) {
return get(getArray(), index);
}
}

CopyOnWriteArraySet 并發(fā)Set

基于CopyOnWriteArrayList實現(xiàn)(內(nèi)含一個CopyOnWriteArrayList成員變量),也就是說底層是一個數(shù)組,意味著每次add都要遍歷整個集合才能知道是否存在,不存在時需要插入(加鎖)。

適用場景:在CopyOnWriteArrayList適用場景下加一個,集合別太大(全部遍歷傷不起)。

ConcurrentLinkedQueue 并發(fā)隊列(基于鏈表)

基于鏈表實現(xiàn)的并發(fā)隊列,使用樂觀鎖(CAS)保證線程安全。因為數(shù)據(jù)結(jié)構(gòu)是鏈表,所以理論上是沒有隊列大小限制的,也就是說添加數(shù)據(jù)一定能成功。

ConcurrentLinkedDeque 并發(fā)隊列(基于雙向鏈表)

基于雙向鏈表實現(xiàn)的并發(fā)隊列,可以分別對頭尾進(jìn)行操作,因此除了先進(jìn)先出(FIFO),也可以先進(jìn)后出(FILO),當(dāng)然先進(jìn)后出的話應(yīng)該叫它棧了。

ConcurrentSkipListMap 基于跳表的并發(fā)Map

SkipList即跳表,跳表是一種空間換時間的數(shù)據(jù)結(jié)構(gòu),通過冗余數(shù)據(jù),將鏈表一層一層索引,達(dá)到類似二分查找的效果

ConcurrentSkipListSet 基于跳表的并發(fā)Set

類似HashSet和HashMap的關(guān)系,ConcurrentSkipListSet里面就是一個ConcurrentSkipListMap,就不細(xì)說了。

ArrayBlockingQueue 阻塞隊列(基于數(shù)組)

基于數(shù)組實現(xiàn)的可阻塞隊列,構(gòu)造時必須制定數(shù)組大小,往里面放東西時如果數(shù)組滿了便會阻塞直到有位置(也支持直接返回和超時等待),通過一個鎖ReentrantLock保證線程安全。

用offer操作舉個例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 讀寫共用此鎖,線程間通過下面兩個Condition通信
* 這兩個Condition和lock有緊密聯(lián)系(就是lock的方法生成的)
* 類似Object的wait/notify
*/
final ReentrantLock lock;

/** 隊列不為空的信號,取數(shù)據(jù)的線程需要關(guān)注 */
private final Condition notEmpty;

/** 隊列沒滿的信號,寫數(shù)據(jù)的線程需要關(guān)注 */
private final Condition notFull;

// 一直阻塞直到有東西可以拿出來
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

// 在尾部插入一個元素,隊列已滿時等待指定時間,如果還是不能插入則返回
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 鎖住
try {
// 循環(huán)等待直到隊列有空閑
while (count == items.length) {
if (nanos <= 0)
return false;// 等待超時,返回
// 暫時放出鎖,等待一段時間(可能被提前喚醒并搶到鎖,所以需要循環(huán)判斷條件)
// 這段時間可能其他線程取走了元素,這樣就有機(jī)會插入了
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);//插入一個元素
return true;
} finally {
lock.unlock(); //解鎖
}
}

乍一看會有點疑惑,讀和寫都是同一個鎖,那要是空的時候正好一個讀線程來了不會一直阻塞嗎?

答案就在notEmpty、notFull里,這兩個出自lock的小東西讓鎖有了類似synchronized + wait + notify的功能。傳送門 → 終于搞懂了sleep/wait/notify/notifyAll

LinkedBlockingQueue 阻塞隊列(基于鏈表)

基于鏈表實現(xiàn)的阻塞隊列,想比與不阻塞的ConcurrentLinkedQueue,它多了一個容量限制,如果不設(shè)置默認(rèn)為int最大值。

LinkedBlockingDeque 阻塞隊列(基于雙向鏈表)

類似LinkedBlockingQueue,但提供了雙向鏈表特有的操作。

PriorityBlockingQueue 線程安全的優(yōu)先隊列

構(gòu)造時可以傳入一個比較器,可以看做放進(jìn)去的元素會被排序,然后讀取的時候按順序消費。某些低優(yōu)先級的元素可能長期無法被消費,因為不斷有更高優(yōu)先級的元素進(jìn)來。

SynchronousQueue 數(shù)據(jù)同步交換的隊列

一個虛假的隊列,因為它實際上沒有真正用于存儲元素的空間,每個插入操作都必須有對應(yīng)的取出操作,沒取出時無法繼續(xù)放入。

一個簡單的例子感受一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.concurrent.*;

public class Main {

public static void main(String[] args) {
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
new Thread(() -> {
try {
// 沒有休息,瘋狂寫入
for (int i = 0; ; i++) {
System.out.println('放入: ' + i);
queue.put(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}

}).start();
new Thread(() -> {
try {
// 咸魚模式取數(shù)據(jù)
while (true) {
System.out.println('取出: ' + queue.take());
Thread.sleep((long) (Math.random() * 2000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}

}).start();
}

}

/* 輸出:

放入: 0
取出: 0
放入: 1
取出: 1
放入: 2
取出: 2
放入: 3
取出: 3

*/

可以看到,寫入的線程沒有任何sleep,可以說是全力往隊列放東西,而讀取的線程又很不積極,讀一個又sleep一會。輸出的結(jié)果卻是讀寫操作成對出現(xiàn)。

JAVA中一個使用場景就是Executors.newCachedThreadPool(),創(chuàng)建一個緩存線程池。

1
2
3
4
5
6
7
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0, // 核心線程為0,沒用的線程都被無情拋棄
Integer.MAX_VALUE, // 最大線程數(shù)理論上是無限了,還沒到這個值機(jī)器資源就被掏空了
60L, TimeUnit.SECONDS, // 閑置線程60秒后銷毀
new SynchronousQueue<Runnable>()); // offer時如果沒有空閑線程取出任務(wù),則會失敗,線程池就會新建一個線程
}

LinkedTransferQueue 基于鏈表的數(shù)據(jù)交換隊列

實現(xiàn)了接口TransferQueue,通過transfer方法放入元素時,如果發(fā)現(xiàn)有線程在阻塞在取元素,會直接把這個元素給等待線程。如果沒有人等著消費,那么會把這個元素放到隊列尾部,并且此方法阻塞直到有人讀取這個元素。和SynchronousQueue有點像,但比它更強(qiáng)大。

DelayQueue 延時隊列

可以使放入隊列的元素在指定的延時后才被消費者取出,元素需要實現(xiàn)Delayed接口。

總結(jié)

上面簡單介紹了JAVA并發(fā)包下的一些容器類,知道有這些東西,遇到合適的場景時就能想起有個現(xiàn)成的東西可以用了。想要知其所以然,后續(xù)還得再深入探索一番。

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多