Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
Spark UnifiedMemoryManagerÄÚ´æ¹ÜÀíÄ£ÐÍ·ÖÎö
 
À´Ô´£º¼òÊé ·¢²¼ÓÚ£º2017-7-28
  3219  次浏览      27
 

SparkµÄÄÚ´æÊ¹Ó㬴óÌåÉÏ¿ÉÒÔ·ÖΪÁ½ÀࣺExecutionÄÚ´æºÍStorageÄÚ´æ¡£ÔÚSpark 1.5°æ±¾Ö®Ç°£¬ÄÚ´æ¹ÜÀíʹÓõÄÊÇStaticMemoryManager£¬¸ÃÄÚ´æ¹ÜÀíÄ£ÐÍ×î´óµÄÌØµã¾ÍÊÇ£¬¿ÉÒÔΪExecutionÄÚ´æÇøÓëStorageÄÚ´æÇøÅäÖÃÒ»¸ö¾²Ì¬µÄboundary£¬ÕâÖÖ·½Ê½ÊµÏÖÆðÀ´±È½Ï¼òµ¥£¬µ«ÊÇ´æÔÚһЩÎÊÌ⣺

  1. ûÓÐÒ»¸öºÏÀíµÄĬÈÏÖµÄܹ»ÊÊÓ¦²»Í¬¼ÆË㳡¾°ÏµÄWorkload
  2. ÄÚ´æµ÷ÓÅÀ§ÄÑ£¬ÐèÒª¶ÔSparkÄÚ²¿Ô­Àí·Ç³£ÊìϤ²ÅÄÜ×öºÃ
  3. ¶Ô²»ÐèÒªCacheµÄApplicationµÄ¼ÆË㳡¾°£¬Ö»ÄÜʹÓúÜÉÙÒ»²¿·ÖÄÚ´æ

ΪÁ˿˷þÉÏÊöÌáµ½µÄÎÊÌ⣬¾¡Á¿Ìá¸ßSpark¼ÆËãµÄͨÓÃÐÔ£¬½µµÍÄÚ´æµ÷ÓÅÄѶȣ¬¼õÉÙOOMµ¼ÖµÄʧ°ÜÎÊÌ⣬´ÓSpark 1.6°æ±¾¿ªÊ¼£¬ÐÂÔöÁËUnifiedMemoryManager£¨Í³Ò»ÄÚ´æ¹ÜÀí£©ÄÚ´æ¹ÜÀíÄ£Ð͵ÄʵÏÖ¡£UnifiedMemoryManagerÒÀÀµµÄһЩ×é¼þÀ༰Æä¹ØÏµ£¬ÈçÏÂÀàͼËùʾ£º

´ÓÉÏͼ¿ÉÒÔ¿´³ö£¬×îÖ±½Ó×îºËÐĵľÍÊÇStorageMemoryPool ºÍExecutionMemoryPool£¬ËüÃÇʵÏÖÁ˶¯Ì¬ÄÚ´æ³Ø£¨Memory Pool£©µÄ¹¦ÄÜ£¬Äܹ»¶¯Ì¬µ÷ÕûStorageÄÚ´æÇøÓëExecutionÄÚ´æÇøÖ®¼äµÄSoft boundary£¬Ê¹ÄÚ´æ¹ÜÀí¸ü¼ÓÁé»î¡£ÏÂÃæÎÒÃÇ´ÓÄÚ´æ²¼¾ÖºÍÄÚ´æ¿ØÖÆÁ½¸ö·½Ã棬À´·ÖÎöUnifiedMemoryManagerÄÚ´æ¹ÜÀíÄ£ÐÍ¡£

ÄÚ´æ²¼¾Ö

UnifiedMemoryManagerÊÇMemoryManagerµÄÒ»ÖÖʵÏÖ£¬ÊÇ»ùÓÚStaticMemoryManagerµÄ¸Ä½ø¡£ÕâÖÖÄ£ÐÍÒ²Êǽ«Ä³¸öÖ´ÐÐTaskµÄExecutor JVMÄÚ´æ»®·ÖΪÁ½ÀàÄÚ´æÇøÓò£º

  • StorageÄÚ´æÇø
    StorageÄڴ棬ÓÃÀ´»º´æTaskÊý¾Ý¡¢ÔÚSpark¼¯ÈºÖд«Ê䣨Propagation£©ÄÚ²¿Êý¾Ý¡£
  • ExecutionÄÚ´æÇø
    ExecutionÄڴ棬ÓÃÓÚÂú×ãShuffle¡¢Join¡¢Sort¡¢Aggregation¼ÆËã¹ý³ÌÖжÔÄÚ´æµÄÐèÇó¡£

ÕâÖÖеÄÄÚ´æ¹ÜÀíÄ£ÐÍ£¬ÔÚStorageÄÚ´æÇøÓëExecutionÄÚ´æÇøÖ®¼ä³éÏó³öÒ»¸öSoft boundary£¬Äܹ»Âú×㵱ijһ¸öÄÚ´æÇøÖÐÄÚ´æÓÃÁ¿²»×ãµÄʱºò£¬¿ÉÒÔ´ÓÁíÒ»¸öÄÚ´æÇøÖнèÓá£ÎÒÃÇ¿ÉÒÔÀí½âΪ£¬ÉÏÃæStorageÄÚ´æºÍExecution¶ÑÄÚ´æÊÇÊÜSpark¹ÜÀíµÄ£¬¶øÇÒÿһ¸öÄÚ´æÇøÊÇ¿ÉÒÔ¶¯Ì¬ÉìËõµÄ¡£ÕâÑùµÄºÃ´¦ÊÇ£¬µ±Ä³Ò»¸öÄÚ´æÇøÄÚ´æÊ¹ÓÃÁ¿´ïµ½³õʼ·ÖÅäÖµ£¬Èç¹û²»Äܹ»¶¯Ì¬ÉìËõ£¬²»ÄÜÔÚÁ½ÀàÄÚ´æÇøÖ®¼ä½øÐж¯Ì¬µ÷Õû£¨Borrow£©£¬»òÕßÈç¹ûij¸öTask¼ÆËãµÄÊý¾ÝÁ¿ºÜ´ó³¬¹ýÏÞÖÆ£¬¾Í»á³öÏÖOOMÒì³£µ¼ÖÂTaskÖ´ÐÐʧ°Ü¡£Ó¦¸Ã˵£¬ÔÚÒ»¶¨³Ì¶ÈÉÏ£¬UnifiedMemoryManagerÄÚ´æ¹ÜÀíÄ£ÐͽµµÍÁË·¢ÉúOOMµÄ¸ÅÂÊ¡£

ÎÒÃÇÖªµÀ£¬ÔÚSpark ApplicationÌá½»ÒÔºó£¬×îÖÕ»áÔÚWorkerÉÏÆô¶¯¶ÀÁ¢µÄExecutor JVM£¬Task¾ÍÔËÐÐÔÚExecutorÀïÃæ¡£ÔÚÒ»¸öExecutor JVMÄÚ²¿£¬»ùÓÚUnifiedMemoryManagerÕâÖÖÄÚ´æ¹ÜÀíÄ£ÐÍ£¬¶ÑÄÚ´æµÄ²¼¾ÖÈçÏÂͼËùʾ£º

ÉÏͼÖУ¬systemMemoryÊÇExecutor JVMµÄÈ«²¿¶ÑÄڴ棬ÔÚÈ«²¿¶ÑÄÚ´æ»ù´¡ÉÏreservedMemoryÊÇÔ¤ÁôÄڴ棬ĬÈÏ300M£¬ÔòÓÃÓÚSpark¼ÆËãʹÓöÑÄÚ´æ´óСĬÈÏÊÇ£º

maxHeapMemory = (systemMemory - reservedMemory) * 0.6

ÊÜSpark¹ÜÀíµÄ¶ÑÄڴ棬ʹÓÃÈ¥³ýÔ¤ÁôÄÚ´æºóµÄ¡¢Ê£ÓàÄÚ´æµÄ°Ù·Ö±È£¬¿ÉÒÔͨ¹ý²ÎÊýspark.memory.fractionÀ´ÅäÖã¬Ä¬ÈÏÖµÊÇ0.6¡£Executor JVM¶ÑÄڴ棬ȥ³ýÔ¤ÁôµÄreservedMemoryÄڴ棬ĬÈÏʣ϶ÑÄÚ´æµÄ60%ÓÃÓÚexecutionºÍstorageÕâÁ½Àà¶ÑÄڴ棬ĬÈÏÇé¿öÏ£¬ExecutionºÍStorageÄÚ´æÇø¸÷Õ¼50%£¬Õâ¸öÒ²¿ÉÒÔͨ¹ý²ÎÊýspark.memory.storageFractionÀ´ÅäÖã¬Ä¬ÈÏÖµÊÇ0.5¡£±ÈÈ磬ÔÚËùÓвÎÊýʹÓÃĬÈÏÖµµÄÇé¿öÏ£¬ÎÒÃǵÄExecutor JVMÄÚ´æÎªÖ¸¶¨Îª2G£¬ÄÇôUnified Memory´óСΪ(1024 * 2 ¨C 300) * 0.6 = 1048MB£¬ÆäÖУ¬ExecutionºÍStorageÄÚ´æÇø´óС·Ö±ðΪ1048 * 0.5 = 524MB¡£

ÁíÍ⣬»¹ÓÐÒ»¸öÓÃÀ´±£Ö¤Spark ApplicationÄܹ»¼ÆËãµÄ×îСExecutor JVMÄÚ´æ´óСÏÞÖÆ£¬¼´ÎªminSystemMemory = reservedMemory * 1.5 = 300 * 1.5 = 450MB£¬ÎÒÃǼÙÉèExecutor JVMÅäÖÃÁËÕâ¸öĬÈÏ×îСÏÞÖÆÖµ450MB£¬ÔòÊÜSpark¹ÜÀíµÄ¶ÑÄÚ´æ´óСΪ(450 ¨C 300) * 0.6 = 90MB£¬ÆäÖÐExecutionºÍStorageÄÚ´æ´óС·Ö±ðΪ90 * 0.5 = 45MB£¬ÕâÖÖÇé¿ö¶ÔһЩСÄÚ´æÓÃÁ¿µÄSpark¼ÆËãÒ²Äܹ»ºÜºÃµÄÖ§³Ö¡£

ÉÏÃæ£¬ÎÒÃÇÏêϸ˵Ã÷ÁËÊÜSpark¹ÜÀíµÄ¶ÑÄڴ棨OnHeap Memory£©µÄ²¼¾Ö£¬UnifiedMemoryManagerÒ²Äܹ»¶Ô·Ç¶ÑÄڴ棨OffHeap Memory£©½øÐйÜÀí¡£Spark¶ÑÄÚ´æºÍ·Ç¶ÑÄÚ´æµÄ²¼¾Ö£¬ÈçÏÂͼËùʾ£º

ͨ¹ýÉÏͼ¿ÉÒÔ¿´µ½£¬·Ç¶ÑÄڴ棨OffHeap Memory£©Ä¬ÈÏ´óСÅäÖÃֵΪ0£¬±íʾ²»Ê¹Ó÷ǶÑÄڴ棬¿ÉÒÔͨ¹ý²ÎÊýspark.memory.offHeap.sizeÀ´ÉèÖ÷ǶÑÄÚ´æµÄ´óС¡£ÎÞÂÛÊǶԶÑÄڴ棬»¹ÊǶԷǶÑÄڴ棬¶¼·ÖΪExecutionÄÚ´æºÍStorageÄÚ´æÁ½²¿·Ö£¬ËûÃǵķÖÅä´óС±ÈÀýͨ¹ý²ÎÊýspark.memory.storageFractionÀ´¿ØÖÆ£¬Ä¬ÈÏÊÇ0.5¡£

ÄÚ´æ¿ØÖÆ

ͨ¹ýÉÏÃæ£¬ÎÒÃÇÁ˽âÁËUnifiedMemoryManagerÕâÖÖÄÚ´æ¹ÜÀíÄ£Ð͵ÄÄÚ´æ²¼¾Ö×´¿ö¡£½ÓÏÂÀ´£¬ÎÒÃÇ¿´Ò»Ï£¬Í¨¹ýUnifiedMemoryManagerµÄAPI£¬ÈçºÎ¶ÔÄÚ´æ½øÐпØÖÆ£¨·ÖÅä/»ØÊÕ£©¡£ÄÚ´æµÄ¿ØÖÆ£¬Ò²¶ÔÓ¦ÓÚExecutionÄÚ´æÓëStorageÄڴ棬·Ö±ðÓÐÒ»¸öStorageMemoryPool ºÍExecutionMemoryPool£¬ÔÚʵÏÖÀàUnifiedMemoryManagerÖпÉÒÔ¿´µ½Í¨¹ýÕâÁ½¸öMemoryPoolʵÏÖÀ´¿ØÖÆÄÚ´æ´óСµÄÉìËõ£¨Increment/Decrement£©¡£

»ñÈ¡µ±Ç°¶ÑÉϵÄ×î´ó¿ÉÓÃStorageÄڴ棬ÈçÏÂmaxOnHeapStorageMemory·½·¨Ëùʾ£º

override def maxOnHeapStorageMemory: Long = synchronized {
maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
}

¿ÉÒÔ¿´µ½£¬maxHeapMemory±íʾ¶ÑÉÏ¿ÉÓõÄExecutionÄÚ´æÓëStorageÄÚ´æ×ÜÁ¿Ö®ºÍ£¬¼õÈ¥ExecutionÄÚ´æÖÐÒѾ­±»Õ¼ÓõÄÄڴ棬ʣÓàµÄ¶¼ÊǶÑÉϵÄ×î´ó¿ÉÓÃStorageÄÚ´æ¡£

ÔÚUnifiedMemoryManagerÖУ¬Á½Àà×îºËÐĵIJÙ×÷£¬¾ÍÊÇÉêÇë/ÊÍ·ÅStorageÄÚ´æ¡¢ÉêÇë/ÊÍ·ÅExecutionÄڴ棬·Ö±ð˵Ã÷ÈçÏ£º

ÉêÇëStorageÄÚ´æ

ÉêÇëStorageÄÚ´æµÄÂß¼­£¬ÊµÏÖ´úÂëÈçÏÂËùʾ£º

override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized { // ΪblockIdÉêÇënumBytes×Ö½Ú´óСµÄÄÚ´æ
assertInvariants()
assert(numBytes >= 0)
val (executionPool, storagePool, maxMemory) = memoryMode match { // ¸ù¾ÝmemoryModeÖµ£¬·µ»Ø¶ÔÓ¦µÄStorageMemoryPoolÓëExecutionMemoryPool
case MemoryMode.ON_HEAP => (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
maxOnHeapStorageMemory)
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
maxOffHeapMemory)
}
if (numBytes > maxMemory) { // Èç¹ûÉêÇëµÄÄÚ´æ´óÓÚ×î´óµÄStorageÄÚ´æÁ¿£¨¶ÔÓ¦ÉÏÃæ·½·¨maxOnHeapStorageMemory()·µ»ØµÄÄÚ´æ´óС£©£¬ÉêÇëʧ°Ü
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxMemory bytes)")
return false
}
if (numBytes > storagePool.memoryFree) { // Èç¹ûStorageÄÚ´æ¿éÖÐûÓÐ×ã¹»¿ÉÓÃÄÚ´æ¸øblockIdʹÓã¬Ôò¼ÆË㵱ǰStorageÄÚ´æÇøÈ±ÉÙ¶àÉÙÄڴ棬Ȼºó´ÓExecutionÄÚ´æÇøÖнèÓÃ
// There is not enough free memory in the storage pool, so try to borrow free memory from
// the execution pool.
val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
executionPool.decrementPoolSize(memoryBorrowedFromExecution) // ExecutionÄÚ´æÇø¼õµô½èÓÃÄÚ´æÁ¿
storagePool.incrementPoolSize(memoryBorrowedFromExecution) // StorageÄÚ´æÇøÔö¼Ó½èÓÃÄÚ´æÁ¿
}
storagePool.acquireMemory(blockId, numBytes) // Èç¹ûStorageÄÚ´æÇø¿ÉÒÔΪblockId·ÖÅäÄڴ棬ֱ½Ó³É¹¦·ÖÅ䣻·ñÔò£¬Èç¹û´ÓExecutionÄÚ´æÇøÖнèÓõÄÄÚ´æÄܹ»Âú×ãblockId£¬Ôò·ÖÅä³É¹¦£¬²»ÄÜÂú×ãÔò·ÖÅäʧ°Ü¡£
}

Èç¹ûStorageÄÚ´æÇø¿ÉÓÃÄÚ´æÂú×ãÉêÇë´óС£¬ÔòÖ±½Ó³É¹¦·ÖÅäÄڴ棻Èç¹ûStorageÄÚ´æÇø¿ÉÓÃÄÚ´æ´óÓÚ0ÇÒСÓÚÉêÇëµÄÄÚ´æ´óС£¬ÔòÐèÒª´ÓExecutionÄÚ´æÇø½èÓÃÂú×ã·ÖÅä´óСµÄÄڴ棬Èç¹û½èÓóɹ¦£¬ÔòÖ±½Ó³É¹¦·ÖÅäÄڴ棬·ñÔò·ÖÅäʧ°Ü£»Èç¹ûÉêÇëµÄÄڴ泬¹ýÁËStorageÄÚ´æÇøµÄ×î´óÄÚ´æÁ¿£¬Ôò·ÖÅäʧ°Ü¡£

ÁíÍ⣬UnifiedMemoryManager.acquireUnrollMemory()·½·¨ÌṩÁ˶ÔUnrollÄÚ´æµÄÉêÇ룬UnrollÄÚ´æ¾ÍÊÇStorageÄڴ棺

override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
acquireStorageMemory(blockId, numBytes, memoryMode)
}

UnrollÄÚ´æ £¬±»ÓÃÀ´ÔÚStorageÄÚ´æÖÐUnroll£¨Õ¹¿ª£©Ö¸¶¨µÄBlockÊý¾Ý¡£

ÊÍ·ÅStorageÄÚ´æ

ÊÍ·ÅStorageÄÚ´æ±È½Ï¼òµ¥£¬Ö»ÐèÒª¸üÐÂStorageÄÚ´æ¼ÆÁ¿±äÁ¿¼´¿É£¬ÈçÏÂËùʾ£º

def releaseMemory(size: Long): Unit = lock.synchronized {
if (size > _memoryUsed) {
logWarning(s"Attempted to release $size bytes of storage " +
s"memory when we only have ${_memoryUsed} bytes")
_memoryUsed = 0
} else {
_memoryUsed -= size
}
}

ÉêÇëExecutionÄÚ´æ

ÉêÇëExecutionÄڴ棬Ïà¶Ô¸´ÔÓһЩ£¬µ÷ÓÃacquireExecutionMemory()·½·¨¿ÉÄÜ»á×èÈû£¬Ö±µ½ExecutionÄÚ´æÇøÓпÉÓÃÄÚ´æÎªÖ¹¡£UnifiedMemoryManagerµÄacquireExecutionMemory()·½·¨ÊµÏÖÈçÏÂËùʾ£º

override private[memory] def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
... ...
executionPool.acquireMemory(
numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
}

ÉÏÃæ´úÂ룬µ÷ÓÃÁËExecutionMemoryPoolµÄacquireMemory()·½·¨£¬¸Ã·½·¨µÄ²ÎÊýÐèÒª2¸öº¯Êý£¨maybeGrowExecutionPoolº¯ÊýÓÃÀ´¿ØÖÆÈçºÎÔö¼ÓExecutionÄÚ´æÇø¶ÔÓ¦PoolµÄ´óС£¬computeMaxExecutionPoolSizeº¯ÊýÓÃÀ´»ñÈ¡µ±Ç°ExecutionÄÚ´æÇø¶ÔÓ¦PoolµÄ´óС£©¡£ExecutionMemoryPoolµÄacquireMemory()·½·¨Ç©Ãû£¬ÈçÏÂËùʾ£º

private[memory] def acquireMemory(
numBytes: Long,
taskAttemptId: Long,
maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {

ÔÚUnifiedMemoryManagerÄÚ²¿£¬ÊµÏÖÁËÈçºÎ¶¯Ì¬Ôö¼ÓExecutionÄÚ´æÇø¶ÔÓ¦Pool´óСµÄº¯Êý£¬¼´ÎªmaybeGrowExecutionPoolº¯Êý£¬´úÂëÈçÏÂËùʾ£º

def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
if (extraMemoryNeeded > 0) {
// There is not enough free memory in the execution pool, so try to reclaim memory from
// storage. We can reclaim any free memory from the storage pool. If the storage pool
// has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
// the memory that storage has borrowed from execution.
val memoryReclaimableFromStorage = math.max( storagePool.memoryFree, storagePool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) { // ÕâÀïmemoryReclaimableFromStorage´óÓÚ0£¬ËµÃ÷µ±Ç°StorageÄÚ´æÇøÓпÉÓÃÄڴ棬¿ÉÒÔShrink¸ÃPoolµÄÄڴ棬×÷ΪExecutionÄÚ´æÇøµÄ¿ÉÓÃÄÚ´æÊ¹ÓÃ
// Only reclaim as much space as is necessary and available:
val spaceToReclaim = storagePool.freeSpaceToShrinkPool( math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) // ¶ÔStorageÄÚ´æÇø½øÐÐShrink²Ù×÷£¬Èç¹û¿ÉÓÃÄÚ´æ´óÓÚÇëÇóÄÚ´æextraMemoryNeeded£¬ÔòÖ±½Ó½«StorageÄÚ´æÇøÄÚ´æShrink´óСΪextraMemoryNeeded£¬·ñÔòShrink´óСΪStorageÄÚ´æÇøµÄÈ«²¿¿ÉÓÃÄÚ´æ´óС
storagePool.decrementPoolSize(spaceToReclaim) // StorageÄÚ´æÇø¼õµô½èÓÃÄÚ´æÁ¿
executionPool.incrementPoolSize(spaceToReclaim) // ExecutionÄÚ´æÇøÔö¼Ó½èÓÃÄÚ´æÁ¿
}
}
}

ÐèҪ˵Ã÷µÄÊÇ£¬ÉÏÃæµÄstoragePool.poolSizeµÄ´óС¿ÉÄÜ´óÓÚStorageÄÚ´æÇø³õʼ×î´óÄÚ´æ´óС£¬Ö÷ÒªÊÇͨ¹ý½èÓÃExecutionÄÚ´æÇøµÄÄÚ´æµ¼Öµġ£ÕâÀstoragePool.freeSpaceToShrinkPool()·½·¨»áShrinkµôStorageÄÚ´æÇø¿ÉÓÃÄڴ棬ÎÒÃÇ¿ÉÒÔ¿´ÏÂStorageMemoryPoolÖÐÈçºÎShrink StorageÄڴ棬·½·¨ÈçÏÂËùʾ£º

def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory // StorageÄÚ´æÇøÐèÒªÊÍ·ÅremainingSpaceToFree´óСµÄÄÚ´æ
if (remainingSpaceToFree > 0) { // ´óÓÚ0±íʾµ±Ç°StorageÄÚ´æÇøÒѾ­ÎÞ¿ÉÓÃÄڴ棬ÐèҪͨ¹ýÇåÀíStorageÄÚ´æÇøµÄblockÀ´ÊµÏÖShrink²Ù×÷
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode) // ͨ¹ýÇåÀíStorageÄÚ´æÇøµÄblockÊͷŵÄÄÚ´æ´óС
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
} else { // remainingSpaceToFree<=0˵Ã÷µ±Ç°StorageÄÚ´æÇø¿ÉÓÃÄÚ´æ×ã¹»£¬²»ÐèҪͨ¹ýÇåÀí»º´æµÄBlockÀ´ÊÍ·ÅÄÚ´æ
spaceFreedByReleasingUnusedMemory
}
}

MemoryStoreÈçºÎevictBlocksToFreeSpace£¬¿ÉÒÔ²éÔÄMemoryStoreÀàÔ´Â룬ÕâÀïÔÝʱ²»×ö˵Ã÷¡£

×îºó£¬ÎÒÃÇ˵Ã÷ExecutionMemoryPool.acquireMemory()·½·¨ÓëExecutionMemoryPool.releaseMemory()·½·¨µÄʵÏÖ¡£ÔÚ˵Ã÷·½·¨ÊµÏÖÂß¼­Ö®Ç°£¬ÎÒÃÇÏÈ˵Ã÷Ò»ÏÂExecutionÄÚ´æÇøÄÚ´æ·ÖÅäµÄ»ù±¾Ô­Ôò£º

Èç¹ûÓÐN¸ö»îÔ¾£¨Active£©µÄTaskÔÚÔËÐУ¬ExecutionMemoryPoolÐèÒª±£Ö¤Ã¿¸öTaskÔÚ½«Öмä½á¹ûÊý¾ÝSpillµ½´ÅÅÌ֮ǰ£¬ÖÁÉÙÄܹ»ÉêÇëµ½µ±Ç°ExecutionÄÚ´æÇø¶ÔÓ¦µÄPoolÖÐ1/2N´óСµÄÄÚ´æÁ¿£¬ÖÁ¶àÊÇ1/N´óСµÄÄÚ´æ¡£ÕâÀïNÊǶ¯Ì¬±ä»¯µÄ£¬ÒòΪ¿ÉÄÜÓÐеÄTask±»Æô¶¯£¬Ò²ÓпÉÄÜTaskÔËÐÐÍê³ÉÊÍ·Å×ÊÔ´£¬ËùÒÔExecutionMemoryPool»á³ÖÐø¸ú×ÙExecutionMemoryPoolÄÚ²¿Task¼¯ºÏmemoryForTaskµÄ±ä»¯£¬²¢²»¶ÏµØÖØÐ¼ÆËã·ÖÅä¸øÃ¿¸öTaskµÄÕâÁ½¸öÄÚ´æÁ¿µÄÖµ£º1/2NºÍ1/N¡£

ΪÁË´úÂë½ô´ÕÇåÎú£¬ÎÒ°ÑExecutionMemoryPool.acquireMemory()·½·¨Ô´ÂëÖв»±ØÒªµÄ×¢ÊÍÈ¥µôÁË£¬´úÂëÈçÏÂËùʾ£º

private[memory] def acquireMemory(
numBytes: Long,
taskAttemptId: Long,
maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
if (!memoryForTask.contains(taskAttemptId)) { // ExecutionMemoryPoolÄÚ²¿Î¬»¤ÁËÒ»¸öHashMap<TaskAttempID, ÄÚ´æÕ¼ÓÃ×Ö½ÚÊý>
memoryForTask(taskAttemptId) = 0L
// This will later cause waiting tasks to wake up and check numTasks again
lock.notifyAll()
}
while (true) {
val numActiveTasks = memoryForTask.keys.size // µ±Ç°»îÔ¾µÄTaskÊýÁ¿
val curMem = memoryForTask(taskAttemptId) // µ±Ç°TaskʹÓõÄÄÚ´æÁ¿
maybeGrowPool(numBytes - memoryFree) // Èç¹ûÐèÒª£¬Í¨¹ýShrink StorageÄÚ´æÇø¶ÔÓ¦µÄPoolÄÚ´æÀ´Ôö¼ÓExecutionÄÚ´æÇøÄÚ´æ´óС
val maxPoolSize = computeMaxPoolSize() // ¼ÆË㵱ǰExecutionÄÚ´æÇø¶ÔÓ¦PoolµÄ´óС
val maxMemoryPerTask = maxPoolSize / numActiveTasks // ¼ÆËã1/N£º½«µ±Ç°ExecutionÄÚ´æÇø¶ÔÓ¦PoolµÄ´óС£¬Æ½¾ù·ÖÅ䏸ËùÓлîÔ¾µÄTask£¬µÃµ½Ã¿¸öTaskÄܹ»»ñÈ¡µ½µÄ×î´óÄÚ´æ´óС
val minMemoryPerTask = poolSize / (2 * numActiveTasks) // ¼ÆËã1/2N£ºÃ¿¸öTaskÄܹ»»ñÈ¡µ½µÄ×îСÄÚ´æ´óС
val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem)) // ÔÊÐíµ±Ç°Task»ñÈ¡µ½×î´óÄڴ棨·¶Î§£º0 <= X <= 1 / numActiveTasks £©
val toGrant = math.min(maxToGrant, memoryFree) // ¼ÆËãÄÜ·ÖÅ䏸µ±Ç°TaskµÄÄÚ´æ´óС
// Èç¹ûµ±Ç°TaskÎÞ·¨»ñÈ¡µ½1 / (2 * numActiveTasks)µÄÄڴ棬²¢ÇÒÄÜ·ÖÅ䏸µ±Ç°TaskµÄÄÚ´æ´óСÎÞ·¨Âú×ãÉêÇëµÄÄÚ´æÁ¿£¬Ôò×èÈûµÈ´ýÆäËûTaskÊÍ·ÅÄÚ´æºóÔÚlockÉÏ֪ͨ
if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
lock.wait() // ÔÚExecutionMemoryPool.releaseMemory()·½·¨Öлá֪ͨÆäËûÉêÇëÄÚ´æ²¢ÔÚlockÉÏwaitµÄTask£¬ÄÚ´æÒѾ­ÊÍ·Å
} else {
memoryForTask(taskAttemptId) += toGrant // µ±Ç°Task»ñÈ¡µ½Äڴ棬ÐèÒªµÇ¼Çµ½memoryForTask±íÖÐ
return toGrant
}
}
0L // Never reached
}

ÊÍ·ÅExecutionÄÚ´æ

Ïà¶ÔÓ¦µÄ£¬ExecutionMemoryPool.releaseMemory()·½·¨ÊµÏÖÁ˶ÔExecutionÄÚ´æµÄÊͷŲÙ×÷£¬·½·¨ÊµÏÖ´úÂëÈçÏÂËùʾ£º

def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized {
val curMem = memoryForTask.getOrElse(taskAttemptId, 0L)
var memoryToFree = if (curMem < numBytes) { // ¼ÆËãÊÍ·ÅÄÚ´æ´óС
logWarning( s"Internal error: release called on $numBytes bytes but task only has $curMem bytes " +
s"of memory from the $poolName pool")
curMem
} else {
numBytes
}
if (memoryForTask.contains(taskAttemptId)) { // TaskÖ´ÐÐÍê³É£¬´ÓÄÚ²¿Î¬»¤µÄmemoryForTaskÖÐÒÆ³ý
memoryForTask(taskAttemptId) -= memoryToFree
if (memoryForTask(taskAttemptId) <= 0) {
memoryForTask.remove(taskAttemptId)
}
}
lock.notifyAll() // ֪ͨµ÷ÓÃacquireMemory()·½·¨ÉêÇëÄÚ´æµÄTaskÄÚ´æÒѾ­ÊÍ·Å
}

×ܽá

ÐèҪעÒâµÄ£¬Ã¿¸öExecutor JVMÖÐÖ»´æÔÚÒ»¸öUnifiedMemoryManagerʵÀý£¬¸Ã¶ÔÏóͳһ¿ØÖƸÃJVMÄÚ¶ÔStorageºÍExecutionÄÚ´æµÄÉêÇëºÍÊͷŲÙ×÷¡£

ͨ¹ýÉÏÃæµÄ·ÖÎö£¬UnifiedMemoryManager¿ÉÒÔ¿´×öÒ»¸öͳһµÄÄÚ´æ¹ÜÀí¿ØÖÆÆ÷£¬µ×²ãͨ¹ýStorageMemoryPool ÓëExecutionMemoryPoolÌṩµÄÉêÇëÄÚ´æ¡¢ÊÍ·ÅÄÚ´æµÄ¹¦ÄÜ£¬ÊµÏÖ×î»ù±¾µÄbookkeeping¹¦ÄÜ¡£ÔÙÏòµ×²ã£¬Êµ¼Ê²Ù×÷Block¼°ÆäJava¶ÔÏóµÈÊý¾ÝµÄ¹¦ÄÜ£¬¶¼ÊÇÔÚMemoryStoreÖнøÐеģ¬MemoryStore±»ÓÃÀ´ÔÚÄÚ´æÖд洢Êý¾Ý£¬Ö÷Òª°üÀ¨block¡¢·´ÐòÁл¯µÄJava¶ÔÏóÊý×é¡¢ÐòÁл¯µÄByteBuffer£¬Í¬Ê±ËüÌṩÁË´æÈ¡ÄÚ´æÖи÷ÖÖ¸ñʽÊý¾ÝµÄ²Ù×÷¡£¹ØÓÚMemoryStoreµÄ»ù±¾½á¹¹ºÍÔ­Àí£¬ÎÒÃǺóÐø»áµ¥¶À·ÖÎö¡£

   
3219 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ