Made subdirs per local dir configurable, and reduced lock usage a bit
This commit is contained in:
parent
ae8c7d6cfa
commit
e54e1d7043
|
@ -14,13 +14,13 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
|
|||
extends BlockStore(blockManager) {
|
||||
|
||||
val MAX_DIR_CREATION_ATTEMPTS: Int = 10
|
||||
val SUBDIRS_PER_LOCAL_DIR = 128
|
||||
val subDirsPerLocalDir = System.getProperty("spark.diskStore.subdirs", "64").toInt
|
||||
|
||||
// Create one local directory for each path mentioned in spark.local.dir; then, inside this
|
||||
// directory, create multiple subdirectories that we will hash files into, in order to avoid
|
||||
// having really large inodes at the top level.
|
||||
val localDirs = createLocalDirs()
|
||||
val subDirs = Array.fill(localDirs.length)(new Array[File](SUBDIRS_PER_LOCAL_DIR))
|
||||
val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
|
||||
|
||||
addShutdownHook()
|
||||
|
||||
|
@ -92,10 +92,12 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
|
|||
// Figure out which local directory it hashes to, and which subdirectory in that
|
||||
val hash = math.abs(blockId.hashCode)
|
||||
val dirId = hash % localDirs.length
|
||||
val subDirId = (hash / localDirs.length) % SUBDIRS_PER_LOCAL_DIR
|
||||
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
|
||||
|
||||
// Create the subdirectory if it doesn't already exist
|
||||
val subDir = subDirs(dirId).synchronized {
|
||||
var subDir = subDirs(dirId)(subDirId)
|
||||
if (subDir == null) {
|
||||
subDir = subDirs(dirId).synchronized {
|
||||
val old = subDirs(dirId)(subDirId)
|
||||
if (old != null) {
|
||||
old
|
||||
|
@ -106,6 +108,7 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
|
|||
newDir
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
new File(subDir, blockId)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue