Merge pull request #653 from rxin/logging
SPARK-781: Log the temp directory path when Spark says "Failed to create temp directory."
This commit is contained in:
commit
e82a2ffcc9
|
@ -116,8 +116,8 @@ private object Utils extends Logging {
|
|||
while (dir == null) {
|
||||
attempts += 1
|
||||
if (attempts > maxAttempts) {
|
||||
throw new IOException("Failed to create a temp directory after " + maxAttempts +
|
||||
" attempts!")
|
||||
throw new IOException("Failed to create a temp directory (under " + root + ") after " +
|
||||
maxAttempts + " attempts!")
|
||||
}
|
||||
try {
|
||||
dir = new File(root, "spark-" + UUID.randomUUID.toString)
|
||||
|
|
|
@ -82,15 +82,15 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
|
|||
override def size(): Long = lastValidPosition
|
||||
}
|
||||
|
||||
val MAX_DIR_CREATION_ATTEMPTS: Int = 10
|
||||
val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt
|
||||
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
|
||||
private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt
|
||||
|
||||
var shuffleSender : ShuffleSender = null
|
||||
private var shuffleSender : ShuffleSender = null
|
||||
// Create one local directory for each path mentioned in spark.local.dir; then, inside this
|
||||
// directory, create multiple subdirectories that we will hash files into, in order to avoid
|
||||
// having really large inodes at the top level.
|
||||
val localDirs = createLocalDirs()
|
||||
val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
|
||||
private val localDirs: Array[File] = createLocalDirs()
|
||||
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
|
||||
|
||||
addShutdownHook()
|
||||
|
||||
|
@ -99,7 +99,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
|
|||
new DiskBlockObjectWriter(blockId, serializer, bufferSize)
|
||||
}
|
||||
|
||||
|
||||
override def getSize(blockId: String): Long = {
|
||||
getFile(blockId).length()
|
||||
}
|
||||
|
@ -232,8 +231,8 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
|
|||
private def createLocalDirs(): Array[File] = {
|
||||
logDebug("Creating local directories at root dirs '" + rootDirs + "'")
|
||||
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
|
||||
rootDirs.split(",").map(rootDir => {
|
||||
var foundLocalDir: Boolean = false
|
||||
rootDirs.split(",").map { rootDir =>
|
||||
var foundLocalDir = false
|
||||
var localDir: File = null
|
||||
var localDirId: String = null
|
||||
var tries = 0
|
||||
|
@ -248,7 +247,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
|
|||
}
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
logWarning("Attempt " + tries + " to create local dir failed", e)
|
||||
logWarning("Attempt " + tries + " to create local dir " + localDir + " failed", e)
|
||||
}
|
||||
}
|
||||
if (!foundLocalDir) {
|
||||
|
@ -258,7 +257,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
|
|||
}
|
||||
logInfo("Created local directory at " + localDir)
|
||||
localDir
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
private def addShutdownHook() {
|
||||
|
@ -266,15 +265,16 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
|
|||
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
|
||||
override def run() {
|
||||
logDebug("Shutdown hook called")
|
||||
try {
|
||||
localDirs.foreach { localDir =>
|
||||
localDirs.foreach { localDir =>
|
||||
try {
|
||||
if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
|
||||
} catch {
|
||||
case t: Throwable =>
|
||||
logError("Exception while deleting local spark dir: " + localDir, t)
|
||||
}
|
||||
if (shuffleSender != null) {
|
||||
shuffleSender.stop
|
||||
}
|
||||
} catch {
|
||||
case t: Throwable => logError("Exception while deleting local spark dirs", t)
|
||||
}
|
||||
if (shuffleSender != null) {
|
||||
shuffleSender.stop
|
||||
}
|
||||
}
|
||||
})
|
||||
|
|
Loading…
Reference in a new issue