commit
bf984e2745
|
@ -20,6 +20,7 @@ package org.apache.spark.network.netty
|
|||
import java.io.File
|
||||
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
|
||||
private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging {
|
||||
|
@ -57,7 +58,7 @@ private[spark] object ShuffleSender {
|
|||
throw new Exception("Block " + blockId + " is not a shuffle block")
|
||||
}
|
||||
// Figure out which local directory it hashes to, and which subdirectory in that
|
||||
val hash = math.abs(blockId.hashCode)
|
||||
val hash = Utils.nonNegativeHash(blockId)
|
||||
val dirId = hash % localDirs.length
|
||||
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
|
||||
val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
|
||||
|
|
|
@ -238,7 +238,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
|
|||
logDebug("Getting file for block " + blockId)
|
||||
|
||||
// Figure out which local directory it hashes to, and which subdirectory in that
|
||||
val hash = math.abs(blockId.hashCode)
|
||||
val hash = Utils.nonNegativeHash(blockId)
|
||||
val dirId = hash % localDirs.length
|
||||
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
|
||||
|
||||
|
|
|
@ -778,4 +778,18 @@ private[spark] object Utils extends Logging {
|
|||
val rawMod = x % mod
|
||||
rawMod + (if (rawMod < 0) mod else 0)
|
||||
}
|
||||
|
||||
// Handles idiosyncracies with hash (add more as required)
|
||||
def nonNegativeHash(obj: AnyRef): Int = {
|
||||
|
||||
// Required ?
|
||||
if (obj eq null) return 0
|
||||
|
||||
val hash = obj.hashCode
|
||||
// math.abs fails for Int.MinValue
|
||||
val hashAbs = if (Int.MinValue != hash) math.abs(hash) else 0
|
||||
|
||||
// Nothing else to guard against ?
|
||||
hashAbs
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue