[SPARK-24992][CORE] spark should randomize yarn local dir selection

**Description: [SPARK-24992](https://issues.apache.org/jira/browse/SPARK-24992)**
Utils.getLocalDir is used to get path of a temporary directory. However, it always returns the the same directory, which is the first element in the array localRootDirs. When running on YARN, this might causes the case that we always write to one disk, which makes it busy while other disks are free. We should randomize the selection to spread out the loads.

**What changes were proposed in this pull request?**
This PR randomized the selection of local directory inside the method Utils.getLocalDir. This change affects the Utils.fetchFile method since it based on the fact that Utils.getLocalDir always return the same directory to cache file. Therefore, a new variable cachedLocalDir is used to cache the first localDirectory that it gets from Utils.getLocalDir. Also, when getting the configured local directories (inside Utils. getConfiguredLocalDirs), in case we are in yarn mode, the array of directories are also randomized before return.

Author: Hieu Huynh <“Hieu.huynh@oath.com”>

Closes #21953 from hthuynh2/SPARK_24992.
This commit is contained in:
Hieu Huynh 2018-08-06 13:58:28 -05:00 committed by Thomas Graves
parent 1a5e460762
commit 51e2b38d93

View file

@ -83,6 +83,7 @@ private[spark] object Utils extends Logging {
val random = new Random()
private val sparkUncaughtExceptionHandler = new SparkUncaughtExceptionHandler
@volatile private var cachedLocalDir: String = ""
/**
* Define a default value for driver memory here since this value is referenced across the code
@ -462,7 +463,15 @@ private[spark] object Utils extends Logging {
if (useCache && fetchCacheEnabled) {
val cachedFileName = s"${url.hashCode}${timestamp}_cache"
val lockFileName = s"${url.hashCode}${timestamp}_lock"
val localDir = new File(getLocalDir(conf))
// Set the cachedLocalDir for the first time and re-use it later
if (cachedLocalDir.isEmpty) {
this.synchronized {
if (cachedLocalDir.isEmpty) {
cachedLocalDir = getLocalDir(conf)
}
}
}
val localDir = new File(cachedLocalDir)
val lockFile = new File(localDir, lockFileName)
val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel()
// Only one executor entry.
@ -767,13 +776,17 @@ private[spark] object Utils extends Logging {
* - Otherwise, this will return java.io.tmpdir.
*
* Some of these configuration options might be lists of multiple paths, but this method will
* always return a single directory.
* always return a single directory. The return directory is chosen randomly from the array
* of directories it gets from getOrCreateLocalRootDirs.
*/
def getLocalDir(conf: SparkConf): String = {
getOrCreateLocalRootDirs(conf).headOption.getOrElse {
val localRootDirs = getOrCreateLocalRootDirs(conf)
if (localRootDirs.isEmpty) {
val configuredLocalDirs = getConfiguredLocalDirs(conf)
throw new IOException(
s"Failed to get a temp directory under [${configuredLocalDirs.mkString(",")}].")
} else {
localRootDirs(scala.util.Random.nextInt(localRootDirs.length))
}
}
@ -815,7 +828,7 @@ private[spark] object Utils extends Logging {
// to what Yarn on this system said was available. Note this assumes that Yarn has
// created the directories already, and that they are secured so that only the
// user has access to them.
getYarnLocalDirs(conf).split(",")
randomizeInPlace(getYarnLocalDirs(conf).split(","))
} else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
} else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {