MemeoryStore
上一節(jié),我們對BlockManager的主要寫入方法做了一個整理,知道了BlockMananger的主要寫入邏輯,以及對于塊信息的管理。但是,由于spark的整個存儲模塊是在是很龐大,而且很多細節(jié)的邏輯錯綜復(fù)雜,如果對于每個細節(jié)都刨根問底,一來精力有限,二來感覺也沒有太大的必要,當(dāng)然如果時間允許肯定是越詳細越好,在這里,我的分析的主要目的是理清存儲模塊的重點邏輯,希望能夠提綱契領(lǐng)地把各個模塊的脈絡(luò)領(lǐng)出來,建立起對spark-core中各模塊的整體認(rèn)知,這樣我們在遇到一些問題的時候就能夠很快地知道應(yīng)該從何處下手,從哪個具體的模塊去找問題。
好了廢話不多說,本節(jié)接著上一節(jié)。上一篇,我們分析了BlockManager的幾個主要的存儲方法,發(fā)現(xiàn)BlockManager主要依靠內(nèi)部的兩個組件MemoryStore和DiskStore來進行實際的數(shù)據(jù)寫入和塊的管理。
本節(jié),我們就來看一下MemoryStore這個組件。
不過,我還是延續(xù)我一貫的風(fēng)格,從外部對一個類的方法調(diào)用為切入點分析這個類的作用和邏輯。
所以,我們先來看一下上一節(jié)對于MemoryStore的主要的方法調(diào)用的總結(jié):
memoryStore.putIteratorAsValues
memoryStore.putIteratorAsBytes
memoryStore.putBytes
memoryStore.putIteratorAsValues
這個方法主要是用于存儲級別是非序列化的情況,即直接以java對象的形式將數(shù)據(jù)存放在jvm堆內(nèi)存上。我們都知道,在jvm堆內(nèi)存上存放大量的對象并不是什么好事,gc壓力大,擠占內(nèi)存,可能引起頻繁的gc,但是也有明顯的好處,就是省去了序列化和反序列化耗時,而且直接從堆內(nèi)存取數(shù)據(jù)顯然比任何其他方式(磁盤和直接內(nèi)存)都要快很多,所以對于內(nèi)存充足且要緩存的數(shù)據(jù)量本省不是很大的情況,這種方式也不失為一種不錯的選擇。
private[storage] def putIteratorAsValues[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
// 用于存儲java對象的容器
val valuesHolder = new DeserializedValuesHolder[T](classTag)
putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match {
// 存儲成功
case Right(storedSize) => Right(storedSize)
// 存儲失敗的情況
case Left(unrollMemoryUsedByThisBlock) =>
// ValuesHolder內(nèi)部的數(shù)組和vector會相互轉(zhuǎn)換
// 數(shù)據(jù)寫入完成后會將vector中的數(shù)據(jù)轉(zhuǎn)移到數(shù)組中
val unrolledIterator = if (valuesHolder.vector != null) {
valuesHolder.vector.iterator
} else {
valuesHolder.arrayValues.toIterator
}
// 返回寫入一半的迭代器、
// 外部調(diào)用者一半會選擇關(guān)閉這個迭代器以釋放被使用的內(nèi)存
Left(new PartiallyUnrolledIterator(
this,
MemoryMode.ON_HEAP,
unrollMemoryUsedByThisBlock,
unrolled = unrolledIterator,
rest = values))
}
}
這個方法的邏輯很簡單,作用也比較單一,主要是對實際存儲方法putIterator的返回結(jié)果做處理,如果失敗的話,就封裝一個PartiallyUnrolledIterator返回給外部調(diào)用這個,調(diào)用這個一般需要將這個寫入一半的迭代器關(guān)閉。
MemoryStore.putIterator
這個方法看似很長,其實邏輯相對簡單,主要做的事就是把數(shù)據(jù)一條一條往ValuesHolder中寫,并周期性地檢查內(nèi)存,如果內(nèi)存不夠就通過內(nèi)存管理器MemoryManager申請內(nèi)存,每次申請當(dāng)前內(nèi)存量的1.5倍。
最后,將ValuesHolder中的數(shù)據(jù)轉(zhuǎn)移到一個數(shù)組中(其實數(shù)據(jù)在SizeTrackingVector中也是以數(shù)組的形式存儲,只不過SizeTrackingVector對象內(nèi)部處理數(shù)組還有一些其他的簿記量,更為關(guān)鍵的是我們需要將存儲的數(shù)據(jù)以同一的接口進行包裝,以利于MemoryStore進行同一管理)。最后還有關(guān)鍵的一步,就是釋放展開內(nèi)存,重新申請存儲內(nèi)存。
此外,這個過程中有使用到memoryManager,具體的方法調(diào)用是:
memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)
------------------------------分割線------------------------------
private def putIterator[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T],
memoryMode: MemoryMode,
valuesHolder: ValuesHolder[T]): Either[Long, Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Number of elements unrolled so far
var elementsUnrolled = 0
// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Initial per-task memory to request for unrolling blocks (bytes).
// 用于數(shù)據(jù)在內(nèi)存展開的初始的內(nèi)存使用量
val initialMemoryThreshold = unrollMemoryThreshold
// How often to check whether we need to request more memory
// 檢查內(nèi)存的頻率,每寫這么多條數(shù)據(jù)就會檢查一次是否需要申請額外的內(nèi)存
val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
// Memory currently reserved by this task for this particular unrolling operation
// 內(nèi)存閾值,開始時等于初始閾值
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
// 內(nèi)存增長因子,每次申請的內(nèi)存是當(dāng)前內(nèi)存的這個倍數(shù)
val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
// Keep track of unroll memory used by this particular block / putIterator() operation
// 當(dāng)前的塊使用的內(nèi)存大小
var unrollMemoryUsedByThisBlock = 0L
// Request enough memory to begin unrolling
// 首先進行初始的內(nèi)存申請,向MemoryManager申請內(nèi)存
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
} else {
// 如果成功申請到內(nèi)存,則累加記錄
unrollMemoryUsedByThisBlock += initialMemoryThreshold
}
// Unroll this block safely, checking whether we have exceeded our threshold periodically
// 循環(huán)將每條數(shù)據(jù)寫入容器中valuesHolder
while (values.hasNext && keepUnrolling) {
valuesHolder.storeValue(values.next())
// 如果寫入數(shù)據(jù)的條數(shù)達到一個周期,那么就檢查一下是否需要申請額外的內(nèi)存
if (elementsUnrolled % memoryCheckPeriod == 0) {
// 通過valuesHolder獲取已經(jīng)寫入的數(shù)據(jù)的評估大小
// 注意,這里的數(shù)據(jù)大小只是估計值,并不是十分準(zhǔn)確
// 具體如何進行估算的可以看valuesHolder內(nèi)部實現(xiàn)
val currentSize = valuesHolder.estimatedSize()
// If our vector's size has exceeded the threshold, request more memory
// 如果已寫入的數(shù)據(jù)大小超過了當(dāng)前閾值
if (currentSize >= memoryThreshold) {
// 這里每次申請的內(nèi)存量都是不一樣的
// 每次申請的內(nèi)存是當(dāng)前已使用內(nèi)存的1.5倍(默認(rèn))
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
// 記錄累積申請的內(nèi)存量
unrollMemoryUsedByThisBlock += amountToRequest
}
// New threshold is currentSize * memoryGrowthFactor
// 目前已經(jīng)向內(nèi)存管理器申請的內(nèi)存量
memoryThreshold += amountToRequest
}
}
// 記錄插入的數(shù)據(jù)條數(shù)
elementsUnrolled += 1
}
// Make sure that we have enough memory to store the block. By this point, it is possible that
// the block's actual memory usage has exceeded the unroll memory by a small amount, so we
// perform one final call to attempt to allocate additional memory if necessary.
// 如果keepUnrolling為true,說明順利地將所有數(shù)據(jù)插入,
// 并未遇到申請內(nèi)存失敗的情況
if (keepUnrolling) {
// 將內(nèi)部的數(shù)據(jù)轉(zhuǎn)移到一個數(shù)組中
val entryBuilder = valuesHolder.getBuilder()
// 數(shù)據(jù)在內(nèi)存中的精確大小
val size = entryBuilder.preciseSize
// 實際的大小可能大于申請的內(nèi)存量
// 因此根據(jù)實際大小還要再申請額外的內(nèi)存
if (size > unrollMemoryUsedByThisBlock) {
val amountToRequest = size - unrollMemoryUsedByThisBlock
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
}
if (keepUnrolling) {
// 獲取MemoryEntry對象,該對象是對插入數(shù)據(jù)的包裝
val entry = entryBuilder.build()
// Synchronize so that transfer is atomic
memoryManager.synchronized {
// 這一步主要是釋放申請的展開內(nèi)存
// 然后申請存儲內(nèi)存
// 這里需要弄清楚展開內(nèi)存的概念
// 展開狀態(tài)指的是對象在內(nèi)存中處于一種比較松散的狀態(tài),這樣的狀態(tài)方便做一些管理如統(tǒng)計大小等
// 而隨后將對象轉(zhuǎn)移到數(shù)組中,處于一種比較緊實的狀態(tài),數(shù)組相對來說占用的額外內(nèi)存是比較小的
// 一個數(shù)組只是一個對象,只有一個對象頭,可以用來管理大量的對象
releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
// 申請存儲內(nèi)存
val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
assert(success, "transferring unroll memory to storage memory failed")
}
// 放入map中管理起來
entries.synchronized {
entries.put(blockId, entry)
}
logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId,
Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
Right(entry.size)
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, entryBuilder.preciseSize)
// 如果失敗,返回已經(jīng)申請的展開內(nèi)存
Left(unrollMemoryUsedByThisBlock)
}
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, valuesHolder.estimatedSize())
Left(unrollMemoryUsedByThisBlock)
}
}
memoryStore.putIteratorAsBytes
我們再看另一個方法。套路基本和putIteratorAsValues是一樣一樣的。
最大的區(qū)別在于ValuesHolder類型不同。非序列化形式存儲使用的是DeserializedMemoryEntry,而序列化形式存儲使用的是SerializedMemoryEntry。
private[storage] def putIteratorAsBytes[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T],
memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Initial per-task memory to request for unrolling blocks (bytes).
val initialMemoryThreshold = unrollMemoryThreshold
// 字節(jié)數(shù)組的塊大小,默認(rèn)是1m
val chunkSize = if (initialMemoryThreshold > Int.MaxValue) {
logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " +
s"is too large to be set as chunk size. Chunk size has been capped to " +
s"${Utils.bytesToString(Int.MaxValue)}")
Int.MaxValue
} else {
initialMemoryThreshold.toInt
}
// 字節(jié)數(shù)組的容器
val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag,
memoryMode, serializerManager)
putIterator(blockId, values, classTag, memoryMode, valuesHolder) match {
case Right(storedSize) => Right(storedSize)
case Left(unrollMemoryUsedByThisBlock) =>
// 部分展開,部分以序列化形式存儲的block
Left(new PartiallySerializedBlock(
this,
serializerManager,
blockId,
valuesHolder.serializationStream,
valuesHolder.redirectableStream,
unrollMemoryUsedByThisBlock,
memoryMode,
valuesHolder.bbos,
values,
classTag))
}
}
memoryStore.putBytes
我們再來看另一個被外部調(diào)用用來插入數(shù)據(jù)的方法。很簡單,不說了。
def putBytes[T: ClassTag](
blockId: BlockId,
size: Long,
memoryMode: MemoryMode,
_bytes: () => ChunkedByteBuffer): Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// 首先向內(nèi)存管理器申請內(nèi)存
// 這里申請的是存儲內(nèi)存,因為要插入的字節(jié)數(shù)組,
// 所以不需要再展開,也就不需要申請展開內(nèi)存
if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
// We acquired enough memory for the block, so go ahead and put it
val bytes = _bytes()
assert(bytes.size == size)
// 這里直接構(gòu)建了一個SerializedMemoryEntry
// 并放到map中管理起來
val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
entries.synchronized {
entries.put(blockId, entry)
}
logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
true
} else {
false
}
}
小結(jié)
通過對上面的三個方法,其實主要是前兩個方法的分析,我們發(fā)現(xiàn),除了對內(nèi)存進行簿記管理之外,以及通過內(nèi)存管理器申請內(nèi)存之外,插入數(shù)據(jù)最主要的工作其實都是有ValuesHolder對象來完成的。
ValuesHolder特質(zhì)有兩個實現(xiàn)類:DeserializedValuesHolder和SerializedValuesHolder。
DeserializedValuesHolder
DeserializedValuesHolder對象內(nèi)部有兩個成員:vector,是一個SizeTrackingVector;arrayValues,是一個存放值的數(shù)組,用于在所有數(shù)據(jù)插入后,將主句轉(zhuǎn)移到一個數(shù)組中,方便包裝成一個MemoryEntry對象。大部分工作是有SizeTrackingVector完成的。
private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends ValuesHolder[T] {
// Underlying vector for unrolling the block
var vector = new SizeTrackingVector[T]()(classTag)
var arrayValues: Array[T] = null
override def storeValue(value: T): Unit = {
vector += value
}
override def estimatedSize(): Long = {
vector.estimateSize()
}
override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
// We successfully unrolled the entirety of this block
arrayValues = vector.toArray
vector = null
override val preciseSize: Long = SizeEstimator.estimate(arrayValues)
override def build(): MemoryEntry[T] =
DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
}
}
SizeTracker
上面提到的SizeTrackingVector繼承了這個特質(zhì),除了這個特質(zhì),還集成了PrimitiveVector類,但是PrimitiveVector類基本上就是對一個數(shù)組的簡單包裝。
SizeTrackingVector最重要的功能:追蹤對象的大小,就是在SizeTracker特之中實現(xiàn)的。
我大致說一下這個特質(zhì)是如何實現(xiàn)對象大小跟蹤和估算的,代碼實現(xiàn)也并不復(fù)雜,感興趣的可以看一看,限于篇幅這里就不貼了。
- 每插入一定數(shù)量的數(shù)據(jù)(姑且稱之為周期),就會對當(dāng)前的對象進行一次取樣,而這個取樣的周期會越來越長,以1.1倍的速率增長;
- 取樣就是計算對象大小,并與前一次取樣作比較,而且只會保留最近兩次的取樣數(shù)據(jù);
- 每次取樣其實就是獲取兩個數(shù)據(jù),當(dāng)前對象大小,當(dāng)前插入的數(shù)據(jù)條數(shù);
- 這樣與上一次取樣一比較,就能夠計算出每條數(shù)據(jù)的大小了;
- 最后,在返回整個對象大小時,是拿最近一次取樣時記錄下的對象大小,以及根據(jù)最近的情況估算的每條數(shù)據(jù)的大小乘以自從上次取樣以來新插入的數(shù)據(jù)量,二者相加作為對象大小的估算值,
可見這么做并不是什么精確,但是由于是抽樣,而且抽樣周期越往后面越長,所以對于數(shù)據(jù)插入的效率影響很小,而且這種不精確性其實在后續(xù)的內(nèi)存檢查過程中是有考慮到的。在所有數(shù)據(jù)插入完的收尾工作中,會對對象大小做一次精確計算。此外,熟悉spark內(nèi)存管理的同學(xué)應(yīng)該知道,其實spark一般會配置一個安全因子(一般是0.9),也就是說只是用配置的內(nèi)存大小的90%,就是為了盡可能地減少這種不精確的內(nèi)存估算造成OOM的可能性。
SerializedValuesHolder
private class SerializedValuesHolder[T](
blockId: BlockId,
chunkSize: Int,
classTag: ClassTag[T],
memoryMode: MemoryMode,
serializerManager: SerializerManager) extends ValuesHolder[T] {
val allocator = memoryMode match {
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
// 調(diào)用unsafe的本地方法申請直接內(nèi)存
// 這個方法之所以沒有調(diào)用ByteBuffer.allocateDirect方法
// 是因為這個方法分配的直接內(nèi)存大小收到參數(shù)MaxDirectMemorySize限制
// 所以這里繞過ByteBuffer.allocateDirect方法,通過反射和unsafe類創(chuàng)建直接內(nèi)存對象
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
}
val redirectableStream = new RedirectableOutputStream
val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
redirectableStream.setOutputStream(bbos)
val serializationStream: SerializationStream = {
val autoPick = !blockId.isInstanceOf[StreamBlockId]
val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
// 包裝壓縮流和序列化流
ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
}
// 寫入方法,寫入的對象經(jīng)過序列化,壓縮,
// 然后經(jīng)過ChunkedByteBufferOutputStream被分割成一個個的字節(jié)數(shù)組塊
override def storeValue(value: T): Unit = {
serializationStream.writeObject(value)(classTag)
}
override def estimatedSize(): Long = {
bbos.size
}
override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
// We successfully unrolled the entirety of this block
serializationStream.close()
override def preciseSize(): Long = bbos.size
override def build(): MemoryEntry[T] =
SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
}
}
大概看一下,主要的邏輯很簡單,這里面有幾個注意點:
- 對于直接內(nèi)存分配,spark并沒有使用jdk的高級api,而是反射配合unsafe類分配直接內(nèi)存,這樣可以繞過jvm參數(shù)MaxDirectMemorySize的限制,這也體現(xiàn)了spark的作者盡可能的降低用戶使用難度
- 另外,我們看到序列化流其實經(jīng)過了層層包裝(典型的裝飾器模式),序列化和壓縮以及分塊是比較重要的幾個點,感興趣的話可以深究,序列化和壓縮如果深入了解都是很大的課題,所以這里也僅僅是蜻蜓點水,不深究了。
總結(jié)
MemoryStore.scala這個文件中乍看代碼有八百多行,但是其實很大部分代碼是一些輔助類,比較核心的寫入邏輯也就是前面提到的幾個方法,再加上核心的兩個類DeserializedValuesHolder和SerializedValuesHolder實現(xiàn)了以對象或字節(jié)數(shù)組的形式存儲數(shù)據(jù)。
|