原文: https:///post/hashma
排版:java技術(shù)棧
網(wǎng)上關(guān)于 HashMap 和 ConcurrentHashMap 的文章確實(shí)不少,不過(guò)缺斤少兩的文章比較多,所以才想自己也寫(xiě)一篇,把細(xì)節(jié)說(shuō)清楚說(shuō)透,尤其像 Java8 中的 ConcurrentHashMap,大部分文章都說(shuō)不清楚。
終歸是希望能降低大家學(xué)習(xí)的成本,不希望大家到處找各種不是很靠譜的文章,看完一篇又一篇,可是還是模模糊糊。
閱讀建議: 四節(jié)基本上可以進(jìn)行獨(dú)立閱讀,建議初學(xué)者可按照以下順序進(jìn)行閱讀,可適當(dāng)降低閱讀門(mén)檻。
Java7 HashMap -> Java7 ConcurrentHashMap -> Java8 HashMap -> Java8 ConcurrentHashMap
閱讀前提: 本文分析的是源碼,所以至少讀者要熟悉它們的接口使用,同時(shí),對(duì)于并發(fā),讀者至少要知道 CAS、ReentrantLock、UNSAFE 操作這幾個(gè)基本的知識(shí),文中不會(huì)對(duì)這些知識(shí)進(jìn)行介紹。Java8 用到了紅黑樹(shù),不過(guò)本文不會(huì)進(jìn)行展開(kāi),感興趣的讀者請(qǐng)自行查找相關(guān)資料。
Java7 HashMap HashMap 是最簡(jiǎn)單的,一來(lái)我們非常熟悉,二來(lái)就是它不支持并發(fā)操作,所以源碼也非常簡(jiǎn)單。
首先,我們用下面這張圖來(lái)介紹 HashMap 的結(jié)構(gòu)。
這個(gè)僅僅是示意圖,因?yàn)闆](méi)有考慮到數(shù)組要擴(kuò)容的情況,具體的后面再說(shuō)。
大方向上,HashMap 里面是一個(gè)數(shù)組 ,然后數(shù)組中每個(gè)元素是一個(gè)單向鏈表 。
上圖中,每個(gè)綠色的實(shí)體是嵌套類(lèi) Entry 的實(shí)例,Entry 包含四個(gè)屬性: key, value, hash 值和用于單向鏈表的 next。
capacity: 當(dāng)前數(shù)組容量,始終保持 2^n,可以擴(kuò)容,擴(kuò)容后數(shù)組大小為當(dāng)前的 2 倍。
loadFactor: 負(fù)載因子,默認(rèn)為 0.75。
threshold: 擴(kuò)容的閾值,等于 capacity * loadFactor
put 過(guò)程分析 還是比較簡(jiǎn)單的,跟著代碼走一遍吧。
public V put (K key, V value ) { // 當(dāng)插入第一個(gè)元素的時(shí)候,需要先初始化數(shù)組大小 if (table == EMPTY_TABLE) { inflateTable(threshold); } // 如果 key 為 null,感興趣的可以往里看,最終會(huì)將這個(gè) entry 放到 table[0] 中 if (key == null ) return putForNullKey(value ); // 1. 求 key 的 hash 值 int hash = hash(key); // 2. 找到對(duì)應(yīng)的數(shù)組下標(biāo) int i = indexFor(hash, table.length); // 3. 遍歷一下對(duì)應(yīng)下標(biāo)處的鏈表,看是否有重復(fù)的 key 已經(jīng)存在, // 如果有,直接覆蓋,put 方法返回舊值就結(jié)束了 for (Entry<K,V> e = table[i]; e ! = null ; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || key.equals (k))) { V oldValue = e.value ; e.value = value ; e.recordAccess(this ); return oldValue; } } modCount++; // 4. 不存在重復(fù)的 key,將此 entry 添加到鏈表中,細(xì)節(jié)后面說(shuō) addEntry(hash, key, value , i); return null ; }
數(shù)組初始化 在第一個(gè)元素插入 HashMap 的時(shí)候做一次數(shù)組的初始化,就是先確定初始的數(shù)組大小,并計(jì)算數(shù)組擴(kuò)容的閾值。
private void inflateTable (int toSize) { // 保證數(shù)組大小一定是 2 的 n 次方。 // 比如這樣初始化:new HashMap(20),那么處理成初始數(shù)組大小是 32 int capacity = roundUpToPowerOf2(toSize); // 計(jì)算擴(kuò)容閾值:capacity * loadFactor threshold = (int ) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1 ); // 算是初始化數(shù)組吧 table = new Entry[capacity]; initHashSeedAsNeeded(capacity); //ignore }
這里有一個(gè)將數(shù)組大小保持為 2 的 n 次方的做法,Java7 和 Java8 的 HashMap 和 ConcurrentHashMap 都有相應(yīng)的要求,只不過(guò)實(shí)現(xiàn)的代碼稍微有些不同,后面再看到的時(shí)候就知道了。
計(jì)算具體數(shù)組位置 這個(gè)簡(jiǎn)單,我們自己也能 YY 一個(gè): 使用 key 的 hash 值對(duì)數(shù)組長(zhǎng)度進(jìn)行取模就可以了。
static int indexFor (int hash, int length) { // assert Integer.bitCount(length) == 1 : 'length must be a non-zero power of 2'; return hash & (length-1 ); }
這個(gè)方法很簡(jiǎn)單,簡(jiǎn)單說(shuō)就是取 hash 值的低 n 位。 如在數(shù)組長(zhǎng)度為 32 的時(shí)候,其實(shí)取的就是 key 的 hash 值的低 5 位,作為它在數(shù)組中的下標(biāo)位置。
添加節(jié)點(diǎn)到鏈表中 找到數(shù)組下標(biāo)后,會(huì)先進(jìn)行 key 判重,如果沒(méi)有重復(fù),就準(zhǔn)備將新值放入到鏈表的表頭。
void addEntry (int hash, K key, V value , int bucketIndex ) { // 如果當(dāng)前 HashMap 大小已經(jīng)達(dá)到了閾值,并且新值要插入的數(shù)組位置已經(jīng)有元素了,那么要擴(kuò)容 if ((size >= threshold) && (null ! = table[bucketIndex])) { // 擴(kuò)容,后面會(huì)介紹一下 resize(2 * table.length); // 擴(kuò)容以后,重新計(jì)算 hash 值 hash = (null ! = key) ? hash(key) : 0 ; // 重新計(jì)算擴(kuò)容后的新的下標(biāo) bucketIndex = indexFor(hash, table.length); } // 往下看 createEntry(hash, key, value , bucketIndex); } // 這個(gè)很簡(jiǎn)單,其實(shí)就是將新值放到鏈表的表頭,然后 size++ void createEntry (int hash, K key, V value , int bucketIndex ) { Entry<K,V> e = table[bucketIndex]; table[bucketIndex] = new Entry<>(hash, key, value , e); size++; }
這個(gè)方法的主要邏輯就是先判斷是否需要擴(kuò)容,需要的話(huà)先擴(kuò)容,然后再將這個(gè)新的數(shù)據(jù)插入到擴(kuò)容后的數(shù)組的相應(yīng)位置處的鏈表的表頭。
數(shù)組擴(kuò)容 前面我們看到,在插入新值的時(shí)候,如果當(dāng)前的 size 已經(jīng)達(dá)到了閾值,并且要插入的數(shù)組位置上已經(jīng)有元素,那么就會(huì)觸發(fā)擴(kuò)容,擴(kuò)容后,數(shù)組大小為原來(lái)的 2 倍。
void resize (int newCapacity) { Entry[] oldTable = table; int oldCapacity = oldTable.length; if (oldCapacity == MAXIMUM_CAPACITY) { threshold = Integer.MAX_VALUE; return ; } // 新的數(shù)組 Entry[] newTable = new Entry[newCapacity]; // 將原來(lái)數(shù)組中的值遷移到新的更大的數(shù)組中 transfer(newTable, initHashSeedAsNeeded(newCapacity)); table = newTable; threshold = (int )Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1 ); }
擴(kuò)容就是用一個(gè)新的大數(shù)組替換原來(lái)的小數(shù)組,并將原來(lái)數(shù)組中的值遷移到新的數(shù)組中。
由于是雙倍擴(kuò)容,遷移過(guò)程中,會(huì)將原來(lái) table[i] 中的鏈表的所有節(jié)點(diǎn),分拆到新的數(shù)組的 newTable[i] 和 newTable[i + oldLength] 位置上。 如原來(lái)數(shù)組長(zhǎng)度是 16,那么擴(kuò)容后,原來(lái) table[0] 處的鏈表中的所有元素會(huì)被分配到新數(shù)組中 newTable[0] 和 newTable[16] 這兩個(gè)位置。 代碼比較簡(jiǎn)單,這里就不展開(kāi)了。
get 過(guò)程分析 相對(duì)于 put 過(guò)程,get 過(guò)程是非常簡(jiǎn)單的。
根據(jù) key 計(jì)算 hash 值。
找到相應(yīng)的數(shù)組下標(biāo):hash & (length – 1)。
遍歷該數(shù)組位置處的鏈表,直到找到相等(==或equals)的 key。
public V get (Object key) { // 之前說(shuō)過(guò),key 為 null 的話(huà),會(huì)被放到 table[0],所以只要遍歷下 table[0] 處的鏈表就可以了 if (key == null ) return getForNullKey(); // Entry<K,V> entry = getEntry(key); return null == entry ? null : entry.getValue(); }
getEntry(key):
final Entry<K,V> getEntry(Object key) { if (size == 0 ) { return null ; } int hash = (key == null ) ? 0 : hash(key); // 確定數(shù)組下標(biāo),然后從頭開(kāi)始遍歷鏈表,直到找到為止 for (Entry<K,V> e = table[indexFor(hash, table.length)]; e ! = null ; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || (key ! = null && key.equals(k)))) return e; } return null ; }
Java7 ConcurrentHashMap ConcurrentHashMap 和 HashMap 思路是差不多的,但是因?yàn)樗С植l(fā)操作,所以要復(fù)雜一些。
整個(gè) ConcurrentHashMap 由一個(gè)個(gè) Segment 組成,Segment 代表”部分“或”一段“的意思,所以很多地方都會(huì)將其描述為分段鎖 。 注意,行文中,我很多地方用了 “槽” 來(lái)代表一個(gè) segment。
簡(jiǎn)單理解就是,ConcurrentHashMap 是一個(gè) Segment 數(shù)組,Segment 通過(guò)繼承 ReentrantLock 來(lái)進(jìn)行加鎖,所以每次需要加鎖的操作鎖住的是一個(gè) segment,這樣只要保證每個(gè) Segment 是線(xiàn)程安全的,也就實(shí)現(xiàn)了全局的線(xiàn)程安全。 推薦閱讀: 史上最全 Java 多線(xiàn)程面試題及答案 。
concurrencyLevel: 并行級(jí)別、并發(fā)數(shù)、Segment 數(shù),怎么翻譯不重要,理解它。默認(rèn)是 16,也就是說(shuō) ConcurrentHashMap 有 16 個(gè) Segments,所以理論上,這個(gè)時(shí)候,最多可以同時(shí)支持 16 個(gè)線(xiàn)程并發(fā)寫(xiě),只要它們的操作分別分布在不同的 Segment 上。 這個(gè)值可以在初始化的時(shí)候設(shè)置為其他值,但是一旦初始化以后,它是不可以擴(kuò)容的。
再具體到每個(gè) Segment 內(nèi)部,其實(shí)每個(gè) Segment 很像之前介紹的 HashMap,不過(guò)它要保證線(xiàn)程安全,所以處理起來(lái)要麻煩些。 推薦閱讀: 幾種線(xiàn)程安全的Map解析 。
初始化 initialCapacity: 初始容量,這個(gè)值指的是整個(gè) ConcurrentHashMap 的初始容量,實(shí)際操作的時(shí)候需要平均分給每個(gè) Segment。
loadFactor: 負(fù)載因子,之前我們說(shuō)了,Segment 數(shù)組不可以擴(kuò)容,所以這個(gè)負(fù)載因子是給每個(gè) Segment 內(nèi)部使用的。
public ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel) { if (! (loadFactor > 0 ) || initialCapacity < 0 || concurrencyLevel <= 0 ) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0 ; int ssize = 1 ; // 計(jì)算并行級(jí)別 ssize,因?yàn)橐3植⑿屑?jí)別是 2 的 n 次方 while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1 ; } // 我們這里先不要那么燒腦,用默認(rèn)值,concurrencyLevel 為 16,sshift 為 4 // 那么計(jì)算出 segmentShift 為 28,segmentMask 為 15,后面會(huì)用到這兩個(gè)值 this .segmentShift = 32 - sshift; this .segmentMask = ssize - 1 ; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; // initialCapacity 是設(shè)置整個(gè) map 初始的大小, // 這里根據(jù) initialCapacity 計(jì)算 Segment 數(shù)組中每個(gè)位置可以分到的大小 // 如 initialCapacity 為 64,那么每個(gè) Segment 或稱(chēng)之為'槽'可以分到 4 個(gè) int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; // 默認(rèn) MIN_SEGMENT_TABLE_CAPACITY 是 2,這個(gè)值也是有講究的,因?yàn)檫@樣的話(huà),對(duì)于具體的槽上, // 插入一個(gè)元素不至于擴(kuò)容,插入第二個(gè)的時(shí)候才會(huì)擴(kuò)容 int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1 ; // 創(chuàng)建 Segment 數(shù)組, // 并創(chuàng)建數(shù)組的第一個(gè)元素 segment[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int )(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; // 往數(shù)組寫(xiě)入 segment[0] UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this .segments = ss; }
初始化完成,我們得到了一個(gè) Segment 數(shù)組。
我們就當(dāng)是用 new ConcurrentHashMap() 無(wú)參構(gòu)造函數(shù)進(jìn)行初始化的,那么初始化完成后:
Segment 數(shù)組長(zhǎng)度為 16,不可以擴(kuò)容
Segment[i] 的默認(rèn)大小為 2,負(fù)載因子是 0.75,得出初始閾值為 1.5,也就是以后插入第一個(gè)元素不會(huì)觸發(fā)擴(kuò)容,插入第二個(gè)會(huì)進(jìn)行第一次擴(kuò)容
這里初始化了 segment[0],其他位置還是 null,至于為什么要初始化 segment[0],后面的代碼會(huì)介紹
當(dāng)前 segmentShift 的值為 32 – 4 = 28,segmentMask 為 16 – 1 = 15,姑且把它們簡(jiǎn)單翻譯為移位數(shù)和掩碼,這兩個(gè)值馬上就會(huì)用到。
put 過(guò)程分析 我們先看 put 的主流程,對(duì)于其中的一些關(guān)鍵細(xì)節(jié)操作,后面會(huì)進(jìn)行詳細(xì)介紹。
public V put (K key, V value ) { Segment<K,V> s; if (value == null ) throw new NullPointerException(); // 1. 計(jì)算 key 的 hash 值 int hash = hash(key); // 2. 根據(jù) hash 值找到 Segment 數(shù)組中的位置 j // hash 是 32 位,無(wú)符號(hào)右移 segmentShift(28) 位,剩下低 4 位, // 然后和 segmentMask(15) 做一次與操作,也就是說(shuō) j 是 hash 值的最后 4 位,也就是槽的數(shù)組下標(biāo) int j = (hash >>> segmentShift) & segmentMask; // 剛剛說(shuō)了,初始化的時(shí)候初始化了 segment[0],但是其他位置還是 null, // ensureSegment(j) 對(duì) segment[j] 進(jìn)行初始化 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null ) // in ensureSegment s = ensureSegment(j); // 3. 插入新值到 槽 s 中 return s.put(key, hash, value , false ); }
第一層皮很簡(jiǎn)單,根據(jù) hash 值很快就能找到相應(yīng)的 Segment,之后就是 Segment 內(nèi)部的 put 操作了。
Segment 內(nèi)部是由 數(shù)組+鏈表 組成的。
final V put (K key, int hash, V value , boolean onlyIfAbsent ) { // 在往該 segment 寫(xiě)入前,需要先獲取該 segment 的獨(dú)占鎖 // 先看主流程,后面還會(huì)具體介紹這部分內(nèi)容 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value ); V oldValue; try { // 這個(gè)是 segment 內(nèi)部的數(shù)組 HashEntry<K,V>[] tab = table; // 再利用 hash 值,求應(yīng)該放置的數(shù)組下標(biāo) int index = (tab.length - 1 ) & hash; // first 是數(shù)組該位置處的鏈表的表頭 HashEntry<K,V> first = entryAt(tab, index); // 下面這串 for 循環(huán)雖然很長(zhǎng),不過(guò)也很好理解,想想該位置沒(méi)有任何元素和已經(jīng)存在一個(gè)鏈表這兩種情況 for (HashEntry<K,V> e = first;;) { if (e ! = null ) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals (k))) { oldValue = e.value ; if (! onlyIfAbsent) { // 覆蓋舊值 e.value = value ; ++modCount; } break ; } // 繼續(xù)順著鏈表走 e = e.next; } else { // node 到底是不是 null,這個(gè)要看獲取鎖的過(guò)程,不過(guò)和這里都沒(méi)有關(guān)系。 // 如果不為 null,那就直接將它設(shè)置為鏈表表頭;如果是null,初始化并設(shè)置為鏈表表頭。 if (node ! = null ) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value , first); int c = count + 1 ; // 如果超過(guò)了該 segment 的閾值,這個(gè) segment 需要擴(kuò)容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); // 擴(kuò)容后面也會(huì)具體分析 else // 沒(méi)有達(dá)到閾值,將 node 放到數(shù)組 tab 的 index 位置, // 其實(shí)就是將新的節(jié)點(diǎn)設(shè)置成原鏈表的表頭 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null ; break ; } } } finally { // 解鎖 unlock(); } return oldValue; }
整體流程還是比較簡(jiǎn)單的,由于有獨(dú)占鎖的保護(hù),所以 segment 內(nèi)部的操作并不復(fù)雜。 至于這里面的并發(fā)問(wèn)題,我們稍后再進(jìn)行介紹。
到這里 put 操作就結(jié)束了,接下來(lái),我們說(shuō)一說(shuō)其中幾步關(guān)鍵的操作。
初始化槽: ensureSegment ConcurrentHashMap 初始化的時(shí)候會(huì)初始化第一個(gè)槽 segment[0],對(duì)于其他槽來(lái)說(shuō),在插入第一個(gè)值的時(shí)候進(jìn)行初始化。
這里需要考慮并發(fā),因?yàn)楹芸赡軙?huì)有多個(gè)線(xiàn)程同時(shí)進(jìn)來(lái)初始化同一個(gè)槽 segment[k],不過(guò)只要有一個(gè)成功了就可以。
private Segment<K,V> ensureSegment (int k) { final Segment<K,V>[] ss = this .segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null ) { // 這里看到為什么之前要初始化 segment[0] 了, // 使用當(dāng)前 segment[0] 處的數(shù)組長(zhǎng)度和負(fù)載因子來(lái)初始化 segment[k] // 為什么要用“當(dāng)前”,因?yàn)?nbsp;segment[0] 可能早就擴(kuò)容過(guò)了 Segment<K,V> proto = ss[0 ]; int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int )(cap * lf); // 初始化 segment[k] 內(nèi)部的數(shù)組 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null ) { // 再次檢查一遍該槽是否被其他線(xiàn)程初始化了。 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); // 使用 while 循環(huán),內(nèi)部用 CAS,當(dāng)前線(xiàn)程成功設(shè)值或其他線(xiàn)程成功設(shè)值后,退出 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null ) { if (UNSAFE.compareAndSwapObject(ss, u, null , seg = s)) break ; } } } return seg; }
總的來(lái)說(shuō),ensureSegment(int k) 比較簡(jiǎn)單,對(duì)于并發(fā)操作使用 CAS 進(jìn)行控制。
如果當(dāng)前線(xiàn)程 CAS 失敗,這里的 while 循環(huán)是為了將 seg 賦值返回。
獲取寫(xiě)入鎖: scanAndLockForPut 前面我們看到,在往某個(gè) segment 中 put 的時(shí)候,首先會(huì)調(diào)用 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是說(shuō)先進(jìn)行一次 tryLock() 快速獲取該 segment 的獨(dú)占鎖,如果失敗,那么進(jìn)入到 scanAndLockForPut 這個(gè)方法來(lái)獲取鎖。
下面我們來(lái)具體分析這個(gè)方法中是怎么控制加鎖的。
private HashEntry<K,V> scanAndLockForPut (K key, int hash, V value ) { HashEntry<K,V> first = entryForHash(this , hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null ; int retries = -1 ; // negative while locating node // 循環(huán)獲取鎖 while (! tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0 ) { if (e == null ) { if (node == null ) // speculatively create node // 進(jìn)到這里說(shuō)明數(shù)組該位置的鏈表是空的,沒(méi)有任何元素 // 當(dāng)然,進(jìn)到這里的另一個(gè)原因是 tryLock() 失敗,所以該槽存在并發(fā),不一定是該位置 node = new HashEntry<K,V>(hash, key, value , null ); retries = 0 ; } else if (key.equals (e.key)) retries = 0 ; else // 順著鏈表往下走 e = e.next; } // 重試次數(shù)如果超過(guò) MAX_SCAN_RETRIES(單核1多核64),那么不搶了,進(jìn)入到阻塞隊(duì)列等待鎖 // lock() 是阻塞方法,直到獲取鎖后返回 else if (++retries > MAX_SCAN_RETRIES) { lock (); break ; } else if ((retries & 1 ) == 0 && // 這個(gè)時(shí)候是有大問(wèn)題了,那就是有新的元素進(jìn)到了鏈表,成為了新的表頭 // 所以這邊的策略是,相當(dāng)于重新走一遍這個(gè) scanAndLockForPut 方法 (f = entryForHash(this , hash)) ! = first) { e = first = f; // re-traverse if entry changed retries = -1 ; } } return node; }
這個(gè)方法有兩個(gè)出口,一個(gè)是 tryLock() 成功了,循環(huán)終止,另一個(gè)就是重試次數(shù)超過(guò)了 MAX_SCAN_RETRIES,進(jìn)到 lock() 方法,此方法會(huì)阻塞等待,直到成功拿到獨(dú)占鎖。
這個(gè)方法就是看似復(fù)雜,但是其實(shí)就是做了一件事,那就是獲取該 segment 的獨(dú)占鎖 ,如果需要的話(huà)順便實(shí)例化了一下 node。
擴(kuò)容: rehash 重復(fù)一下,segment 數(shù)組不能擴(kuò)容,擴(kuò)容是 segment 數(shù)組某個(gè)位置內(nèi)部的數(shù)組 HashEntry[] 進(jìn)行擴(kuò)容,擴(kuò)容后,容量為原來(lái)的 2 倍。
首先,我們要回顧一下觸發(fā)擴(kuò)容的地方,put 的時(shí)候,如果判斷該值的插入會(huì)導(dǎo)致該 segment 的元素個(gè)數(shù)超過(guò)閾值,那么先進(jìn)行擴(kuò)容,再插值,讀者這個(gè)時(shí)候可以回去 put 方法看一眼。
該方法不需要考慮并發(fā),因?yàn)榈竭@里的時(shí)候,是持有該 segment 的獨(dú)占鎖的。
// 方法參數(shù)上的 node 是這次擴(kuò)容后,需要添加到新的數(shù)組中的數(shù)據(jù)。 private void rehash (HashEntry<K,V> node ) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; // 2 倍 int newCapacity = oldCapacity << 1 ; threshold = (int )(newCapacity * loadFactor); // 創(chuàng)建新數(shù)組 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; // 新的掩碼,如從 16 擴(kuò)容到 32,那么 sizeMask 為 31,對(duì)應(yīng)二進(jìn)制 ‘000...00011111’ int sizeMask = newCapacity - 1 ; // 遍歷原數(shù)組,老套路,將原數(shù)組位置 i 處的鏈表拆分到 新數(shù)組位置 i 和 i+oldCap 兩個(gè)位置 for (int i = 0 ; i < oldCapacity ; i++) { // e 是鏈表的第一個(gè)元素 HashEntry<K,V> e = oldTable[i]; if (e ! = null ) { HashEntry<K,V> next = e.next; // 計(jì)算應(yīng)該放置在新數(shù)組中的位置, // 假設(shè)原數(shù)組長(zhǎng)度為 16,e 在 oldTable[3] 處,那么 idx 只可能是 3 或者是 3 + 16 = 19 int idx = e.hash & sizeMask; if (next == null ) // 該位置處只有一個(gè)元素,那比較好辦 newTable[idx] = e; else { // Reuse consecutive sequence at same slot // e 是鏈表表頭 HashEntry<K,V> lastRun = e; // idx 是當(dāng)前鏈表的頭結(jié)點(diǎn) e 的新位置 int lastIdx = idx; // 下面這個(gè) for 循環(huán)會(huì)找到一個(gè) lastRun 節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)之后的所有元素是將要放到一起的 for (HashEntry<K,V> last = next; last ! = null ; last = last.next) { int k = last.hash & sizeMask; if (k ! = lastIdx) { lastIdx = k; lastRun = last; } } // 將 lastRun 及其之后的所有節(jié)點(diǎn)組成的這個(gè)鏈表放到 lastIdx 這個(gè)位置 newTable[lastIdx] = lastRun; // 下面的操作是處理 lastRun 之前的節(jié)點(diǎn), // 這些節(jié)點(diǎn)可能分配在另一個(gè)鏈表中,也可能分配到上面的那個(gè)鏈表中 for (HashEntry<K,V> p = e; p ! = lastRun; p = p.next) { V v = p.value ; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } // 將新來(lái)的 node 放到新數(shù)組中剛剛的 兩個(gè)鏈表之一 的 頭部 int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
這里的擴(kuò)容比之前的 HashMap 要復(fù)雜一些,代碼難懂一點(diǎn)。 上面有兩個(gè)挨著的 for 循環(huán),第一個(gè) for 有什么用呢?
仔細(xì)一看發(fā)現(xiàn),如果沒(méi)有第一個(gè) for 循環(huán),也是可以工作的,但是,這個(gè) for 循環(huán)下來(lái),如果 lastRun 的后面還有比較多的節(jié)點(diǎn),那么這次就是值得的。 因?yàn)槲覀冎恍枰寺?nbsp;lastRun 前面的節(jié)點(diǎn),后面的一串節(jié)點(diǎn)跟著 lastRun 走就是了,不需要做任何操作。
我覺(jué)得 Doug Lea 的這個(gè)想法也是挺有意思的,不過(guò)比較壞的情況就是每次 lastRun 都是鏈表的最后一個(gè)元素或者很靠后的元素,那么這次遍歷就有點(diǎn)浪費(fèi)了。
不過(guò) Doug Lea 也說(shuō)了,根據(jù)統(tǒng)計(jì),如果使用默認(rèn)的閾值,大約只有 1/6 的節(jié)點(diǎn)需要克隆。
get 過(guò)程分析 相對(duì)于 put 來(lái)說(shuō),get 真的不要太簡(jiǎn)單。
public V get (Object key ) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; // 1. hash 值 int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 2. 根據(jù) hash 找到對(duì)應(yīng)的 segment if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) ! = null && (tab = s.table) ! = null ) { // 3. 找到segment 內(nèi)部數(shù)組相應(yīng)位置的鏈表,遍歷 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long )(((tab.length - 1 ) & h)) << TSHIFT) + TBASE); e ! = null ; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals (k))) return e.value ; } } return null ; }
并發(fā)問(wèn)題分析 現(xiàn)在我們已經(jīng)說(shuō)完了 put 過(guò)程和 get 過(guò)程,我們可以看到 get 過(guò)程中是沒(méi)有加鎖的,那自然我們就需要去考慮并發(fā)問(wèn)題。