[SPARK-14602][YARN] Use SparkConf to propagate the list of cached files.

This change avoids using the environment to pass this information, since
with many jars it's easy to hit limits on certain OSes. Instead, it encodes
the information into the Spark configuration propagated to the AM.

The first problem that needed to be solved is a chicken & egg issue: the
config file is distributed using the cache, and it needs to contain information
about the files that are being distributed. To solve that, the code now treats
the config archive especially, and uses slightly different code to distribute
it, so that only its cache path needs to be saved to the config file.

The second problem is that the extra information would show up in the Web UI,
which made the environment tab even more noisy than it already is when lots
of jars are listed. This is solved by two changes: the list of cached files
is now read only once in the AM, and propagated down to the ExecutorRunnable
code (which actually sends the list to the NMs when starting containers). The
second change is to unset those config entries after the list is read, so that
the SparkContext never sees them.

Tested with both client and cluster mode by running "run-example SparkPi". This
uploads a whole lot of files when run from a build dir (instead of a distribution,
where the list is cleaned up), and I verified that the configs do not show
up in the UI.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #12487 from vanzin/SPARK-14602.
This commit is contained in:
Marcelo Vanzin 2016-04-20 16:57:23 -07:00
parent 334c293ec0
commit f47dbf27fa
11 changed files with 239 additions and 175 deletions

View file

@ -225,6 +225,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
this
}
private[spark] def remove(entry: ConfigEntry[_]): SparkConf = {
remove(entry.key)
}
/** Get a parameter; throws a NoSuchElementException if it's not set */
def get(key: String): String = {
getOption(key).getOrElse(throw new NoSuchElementException(key))

View file

@ -19,15 +19,17 @@ package org.apache.spark.deploy.yarn
import java.io.{File, IOException}
import java.lang.reflect.InvocationTargetException
import java.net.{Socket, URL}
import java.net.{Socket, URI, URL}
import java.util.concurrent.atomic.AtomicReference
import scala.collection.mutable.HashMap
import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
@ -120,6 +122,61 @@ private[spark] class ApplicationMaster(
private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = None
// Load the list of localized files set by the client. This is used when launching executors,
// and is loaded here so that these configs don't pollute the Web UI's environment page in
// cluster mode.
private val localResources = {
logInfo("Preparing Local resources")
val resources = HashMap[String, LocalResource]()
def setupDistributedCache(
file: String,
rtype: LocalResourceType,
timestamp: String,
size: String,
vis: String): Unit = {
val uri = new URI(file)
val amJarRsrc = Records.newRecord(classOf[LocalResource])
amJarRsrc.setType(rtype)
amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
amJarRsrc.setTimestamp(timestamp.toLong)
amJarRsrc.setSize(size.toLong)
val fileName = Option(uri.getFragment()).getOrElse(new Path(uri).getName())
resources(fileName) = amJarRsrc
}
val distFiles = sparkConf.get(CACHED_FILES)
val fileSizes = sparkConf.get(CACHED_FILES_SIZES)
val timeStamps = sparkConf.get(CACHED_FILES_TIMESTAMPS)
val visibilities = sparkConf.get(CACHED_FILES_VISIBILITIES)
val resTypes = sparkConf.get(CACHED_FILES_TYPES)
for (i <- 0 to distFiles.size - 1) {
val resType = LocalResourceType.valueOf(resTypes(i))
setupDistributedCache(distFiles(i), resType, timeStamps(i).toString, fileSizes(i).toString,
visibilities(i))
}
// Distribute the conf archive to executors.
sparkConf.get(CACHED_CONF_ARCHIVE).foreach { uri =>
val fs = FileSystem.get(new URI(uri), yarnConf)
val status = fs.getFileStatus(new Path(uri))
setupDistributedCache(uri, LocalResourceType.ARCHIVE, status.getModificationTime().toString,
status.getLen.toString, LocalResourceVisibility.PRIVATE.name())
}
// Clean up the configuration so it doesn't show up in the Web UI (since it's really noisy).
CACHE_CONFIGS.foreach { e =>
sparkConf.remove(e)
sys.props.remove(e.key)
}
logInfo("Prepared Local resources " + resources)
resources.toMap
}
def getAttemptId(): ApplicationAttemptId = {
client.getAttemptId()
}
@ -292,7 +349,8 @@ private[spark] class ApplicationMaster(
_sparkConf,
uiAddress,
historyAddress,
securityMgr)
securityMgr,
localResources)
allocator.allocateResources()
reporterThread = launchReporterThread()

View file

@ -328,12 +328,14 @@ private[spark] class Client(
private[yarn] def copyFileToRemote(
destDir: Path,
srcPath: Path,
replication: Short): Path = {
replication: Short,
force: Boolean = false,
destName: Option[String] = None): Path = {
val destFs = destDir.getFileSystem(hadoopConf)
val srcFs = srcPath.getFileSystem(hadoopConf)
var destPath = srcPath
if (!compareFs(srcFs, destFs)) {
destPath = new Path(destDir, srcPath.getName())
if (force || !compareFs(srcFs, destFs)) {
destPath = new Path(destDir, destName.getOrElse(srcPath.getName()))
logInfo(s"Uploading resource $srcPath -> $destPath")
FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
destFs.setReplication(destPath, replication)
@ -553,12 +555,37 @@ private[spark] class Client(
distribute(f, targetDir = targetDir)
}
// Distribute an archive with Hadoop and Spark configuration for the AM and executors.
// Update the configuration with all the distributed files, minus the conf archive. The
// conf archive will be handled by the AM differently so that we avoid having to send
// this configuration by other means. See SPARK-14602 for one reason of why this is needed.
distCacheMgr.updateConfiguration(sparkConf)
// Upload the conf archive to HDFS manually, and record its location in the configuration.
// This will allow the AM to know where the conf archive is in HDFS, so that it can be
// distributed to the containers.
//
// This code forces the archive to be copied, so that unit tests pass (since in that case both
// file systems are the same and the archive wouldn't normally be copied). In most (all?)
// deployments, the archive would be copied anyway, since it's a temp file in the local file
// system.
val remoteConfArchivePath = new Path(destDir, LOCALIZED_CONF_ARCHIVE)
val remoteFs = FileSystem.get(remoteConfArchivePath.toUri(), hadoopConf)
sparkConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString())
val localConfArchive = new Path(createConfArchive().toURI())
copyFileToRemote(destDir, localConfArchive, replication, force = true,
destName = Some(LOCALIZED_CONF_ARCHIVE))
val (_, confLocalizedPath) = distribute(createConfArchive().toURI().getPath(),
resType = LocalResourceType.ARCHIVE,
destName = Some(LOCALIZED_CONF_DIR))
require(confLocalizedPath != null)
// Clear the cache-related entries from the configuration to avoid them polluting the
// UI's environment page. This works for client mode; for cluster mode, this is handled
// by the AM.
CACHE_CONFIGS.foreach(sparkConf.remove)
localResources
}
@ -787,10 +814,6 @@ private[spark] class Client(
val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)
// Set the environment variables to be passed on to the executors.
distCacheMgr.setDistFilesEnv(launchEnv)
distCacheMgr.setDistArchivesEnv(launchEnv)
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources.asJava)
amContainer.setEnvironment(launchEnv.asJava)
@ -1150,6 +1173,9 @@ private object Client extends Logging {
// Subdirectory where the user's Spark and Hadoop config files will be placed.
val LOCALIZED_CONF_DIR = "__spark_conf__"
// File containing the conf archive in the AM. See prepareLocalResources().
val LOCALIZED_CONF_ARCHIVE = LOCALIZED_CONF_DIR + ".zip"
// Name of the file in the conf archive containing Spark configuration.
val SPARK_CONF_FILE = "__spark_conf__.properties"

View file

@ -19,7 +19,7 @@ package org.apache.spark.deploy.yarn
import java.net.URI
import scala.collection.mutable.{HashMap, LinkedHashMap, Map}
import scala.collection.mutable.{HashMap, ListBuffer, Map}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
@ -27,17 +27,21 @@ import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
private case class CacheEntry(
uri: URI,
size: Long,
modTime: Long,
visibility: LocalResourceVisibility,
resType: LocalResourceType)
/** Client side methods to setup the Hadoop distributed cache */
private[spark] class ClientDistributedCacheManager() extends Logging {
// Mappings from remote URI to (file status, modification time, visibility)
private val distCacheFiles: Map[String, (String, String, String)] =
LinkedHashMap[String, (String, String, String)]()
private val distCacheArchives: Map[String, (String, String, String)] =
LinkedHashMap[String, (String, String, String)]()
private val distCacheEntries = new ListBuffer[CacheEntry]()
/**
* Add a resource to the list of distributed cache resources. This list can
@ -72,61 +76,33 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath))
amJarRsrc.setTimestamp(destStatus.getModificationTime())
amJarRsrc.setSize(destStatus.getLen())
if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name")
require(link != null && link.nonEmpty, "You must specify a valid link name.")
localResources(link) = amJarRsrc
if (!appMasterOnly) {
val uri = destPath.toUri()
val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
if (resourceType == LocalResourceType.FILE) {
distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(),
destStatus.getModificationTime().toString(), visibility.name())
} else {
distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(),
destStatus.getModificationTime().toString(), visibility.name())
}
distCacheEntries += CacheEntry(pathURI, destStatus.getLen(), destStatus.getModificationTime(),
visibility, resourceType)
}
}
/**
* Adds the necessary cache file env variables to the env passed in
* Writes down information about cached files needed in executors to the given configuration.
*/
def setDistFilesEnv(env: Map[String, String]): Unit = {
val (keys, tupleValues) = distCacheFiles.unzip
val (sizes, timeStamps, visibilities) = tupleValues.unzip3
if (keys.size > 0) {
env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc, n) => acc + "," + n }
env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") =
timeStamps.reduceLeft[String] { (acc, n) => acc + "," + n }
env("SPARK_YARN_CACHE_FILES_FILE_SIZES") =
sizes.reduceLeft[String] { (acc, n) => acc + "," + n }
env("SPARK_YARN_CACHE_FILES_VISIBILITIES") =
visibilities.reduceLeft[String] { (acc, n) => acc + "," + n }
}
}
/**
* Adds the necessary cache archive env variables to the env passed in
*/
def setDistArchivesEnv(env: Map[String, String]): Unit = {
val (keys, tupleValues) = distCacheArchives.unzip
val (sizes, timeStamps, visibilities) = tupleValues.unzip3
if (keys.size > 0) {
env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc, n) => acc + "," + n }
env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") =
timeStamps.reduceLeft[String] { (acc, n) => acc + "," + n }
env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
sizes.reduceLeft[String] { (acc, n) => acc + "," + n }
env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") =
visibilities.reduceLeft[String] { (acc, n) => acc + "," + n }
}
def updateConfiguration(conf: SparkConf): Unit = {
conf.set(CACHED_FILES, distCacheEntries.map(_.uri.toString))
conf.set(CACHED_FILES_SIZES, distCacheEntries.map(_.size))
conf.set(CACHED_FILES_TIMESTAMPS, distCacheEntries.map(_.modTime))
conf.set(CACHED_FILES_VISIBILITIES, distCacheEntries.map(_.visibility.name()))
conf.set(CACHED_FILES_TYPES, distCacheEntries.map(_.resType.name()))
}
/**
* Returns the local resource visibility depending on the cache file permissions
* @return LocalResourceVisibility
*/
def getVisibility(
private[yarn] def getVisibility(
conf: Configuration,
uri: URI,
statCache: Map[URI, FileStatus]): LocalResourceVisibility = {
@ -141,7 +117,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* Returns a boolean to denote whether a cache file is visible to all (public)
* @return true if the path in the uri is visible to all, false otherwise
*/
def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
private def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
val fs = FileSystem.get(uri, conf)
val current = new Path(uri.getPath())
// the leaf level file should be readable by others
@ -157,7 +133,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* the directory hierarchy to the given path)
* @return true if all ancestors have the 'execute' permission set for all users
*/
def ancestorsHaveExecutePermissions(
private def ancestorsHaveExecutePermissions(
fs: FileSystem,
path: Path,
statCache: Map[URI, FileStatus]): Boolean = {
@ -177,7 +153,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* imply the permission in the passed FsAction
* @return true if the path in the uri is visible to all, false otherwise
*/
def checkPermissionOfOther(
private def checkPermissionOfOther(
fs: FileSystem,
path: Path,
action: FsAction,
@ -194,7 +170,10 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* it in the cache, and returns the FileStatus.
* @return FileStatus
*/
def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = {
private[yarn] def getFileStatus(
fs: FileSystem,
uri: URI,
statCache: Map[URI, FileStatus]): FileStatus = {
val stat = statCache.get(uri) match {
case Some(existstat) => existstat
case None =>

View file

@ -18,7 +18,6 @@
package org.apache.spark.deploy.yarn
import java.io.File
import java.net.URI
import java.nio.ByteBuffer
import java.util.Collections
@ -55,7 +54,8 @@ private[yarn] class ExecutorRunnable(
executorMemory: Int,
executorCores: Int,
appId: String,
securityMgr: SecurityManager)
securityMgr: SecurityManager,
localResources: Map[String, LocalResource])
extends Runnable with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
@ -77,9 +77,7 @@ private[yarn] class ExecutorRunnable(
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
.asInstanceOf[ContainerLaunchContext]
val localResources = prepareLocalResources
ctx.setLocalResources(localResources.asJava)
ctx.setEnvironment(env.asJava)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
@ -88,7 +86,7 @@ private[yarn] class ExecutorRunnable(
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
appId, localResources)
appId)
logInfo(s"""
|===============================================================================
@ -136,8 +134,7 @@ private[yarn] class ExecutorRunnable(
hostname: String,
executorMemory: Int,
executorCores: Int,
appId: String,
localResources: HashMap[String, LocalResource]): List[String] = {
appId: String): List[String] = {
// Extra options for the JVM
val javaOpts = ListBuffer[String]()
@ -239,53 +236,6 @@ private[yarn] class ExecutorRunnable(
commands.map(s => if (s == null) "null" else s).toList
}
private def setupDistributedCache(
file: String,
rtype: LocalResourceType,
localResources: HashMap[String, LocalResource],
timestamp: String,
size: String,
vis: String): Unit = {
val uri = new URI(file)
val amJarRsrc = Records.newRecord(classOf[LocalResource])
amJarRsrc.setType(rtype)
amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
amJarRsrc.setTimestamp(timestamp.toLong)
amJarRsrc.setSize(size.toLong)
localResources(uri.getFragment()) = amJarRsrc
}
private def prepareLocalResources: HashMap[String, LocalResource] = {
logInfo("Preparing Local resources")
val localResources = HashMap[String, LocalResource]()
if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
for( i <- 0 to distFiles.length - 1) {
setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
fileSizes(i), visibilities(i))
}
}
if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
for( i <- 0 to distArchives.length - 1) {
setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
timeStamps(i), fileSizes(i), visibilities(i))
}
}
logInfo("Prepared Local resources " + localResources)
localResources
}
private def prepareEnvironment(container: Container): HashMap[String, String] = {
val env = new HashMap[String, String]()
Client.populateClasspath(null, yarnConf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))

View file

@ -63,7 +63,8 @@ private[yarn] class YarnAllocator(
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
securityMgr: SecurityManager)
securityMgr: SecurityManager,
localResources: Map[String, LocalResource])
extends Logging {
import YarnAllocator._
@ -477,7 +478,8 @@ private[yarn] class YarnAllocator(
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr)
securityMgr,
localResources)
if (launchContainers) {
logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
driverUrl, executorHostname))

View file

@ -20,7 +20,6 @@ package org.apache.spark.deploy.yarn
import java.util.{List => JList}
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.util.Try
import org.apache.hadoop.conf.Configuration
@ -52,6 +51,8 @@ private[spark] class YarnRMClient extends Logging {
* @param sparkConf The Spark configuration.
* @param uiAddress Address of the SparkUI.
* @param uiHistoryAddress Address of the application on the History Server.
* @param securityMgr The security manager.
* @param localResources Map with information about files distributed via YARN's cache.
*/
def register(
driverUrl: String,
@ -60,7 +61,8 @@ private[spark] class YarnRMClient extends Logging {
sparkConf: SparkConf,
uiAddress: String,
uiHistoryAddress: String,
securityMgr: SecurityManager
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]
): YarnAllocator = {
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
@ -72,7 +74,8 @@ private[spark] class YarnRMClient extends Logging {
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
registered = true
}
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr)
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
localResources)
}
/**

View file

@ -261,4 +261,54 @@ package object config {
.stringConf
.toSequence
.createOptional
/* Configuration and cached file propagation. */
private[spark] val CACHED_FILES = ConfigBuilder("spark.yarn.cache.filenames")
.internal()
.stringConf
.toSequence
.createWithDefault(Nil)
private[spark] val CACHED_FILES_SIZES = ConfigBuilder("spark.yarn.cache.sizes")
.internal()
.longConf
.toSequence
.createWithDefault(Nil)
private[spark] val CACHED_FILES_TIMESTAMPS = ConfigBuilder("spark.yarn.cache.timestamps")
.internal()
.longConf
.toSequence
.createWithDefault(Nil)
private[spark] val CACHED_FILES_VISIBILITIES = ConfigBuilder("spark.yarn.cache.visibilities")
.internal()
.stringConf
.toSequence
.createWithDefault(Nil)
// Either "file" or "archive", for each file.
private[spark] val CACHED_FILES_TYPES = ConfigBuilder("spark.yarn.cache.types")
.internal()
.stringConf
.toSequence
.createWithDefault(Nil)
// The location of the conf archive in HDFS.
private[spark] val CACHED_CONF_ARCHIVE = ConfigBuilder("spark.yarn.cache.confArchive")
.internal()
.stringConf
.createOptional
// The list of cache-related config entries. This is used by Client and the AM to clean
// up the environment so that these settings do not appear on the web UI.
private[yarn] val CACHE_CONFIGS = Seq(
CACHED_FILES,
CACHED_FILES_SIZES,
CACHED_FILES_TIMESTAMPS,
CACHED_FILES_VISIBILITIES,
CACHED_FILES_TYPES,
CACHED_CONF_ARCHIVE)
}

View file

@ -33,7 +33,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils
import org.mockito.Mockito.when
import org.scalatest.mock.MockitoSugar
import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.yarn.config._
class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar {
@ -84,18 +85,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
assert(resource.getSize() === 0)
assert(resource.getType() === LocalResourceType.FILE)
val env = new HashMap[String, String]()
distMgr.setDistFilesEnv(env)
assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link")
assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0")
assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0")
assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
distMgr.setDistArchivesEnv(env)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
val sparkConf = new SparkConf(false)
distMgr.updateConfiguration(sparkConf)
assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link"))
assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(0L))
assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(0L))
assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name()))
assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.FILE.name()))
// add another one and verify both there and order correct
val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
@ -111,20 +107,22 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
assert(resource2.getSize() === 20)
assert(resource2.getType() === LocalResourceType.FILE)
val env2 = new HashMap[String, String]()
distMgr.setDistFilesEnv(env2)
val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
val files = env2("SPARK_YARN_CACHE_FILES").split(',')
val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',')
val sparkConf2 = new SparkConf(false)
distMgr.updateConfiguration(sparkConf2)
val files = sparkConf2.get(CACHED_FILES)
val sizes = sparkConf2.get(CACHED_FILES_SIZES)
val timestamps = sparkConf2.get(CACHED_FILES_TIMESTAMPS)
val visibilities = sparkConf2.get(CACHED_FILES_VISIBILITIES)
assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
assert(timestamps(0) === "0")
assert(sizes(0) === "0")
assert(timestamps(0) === 0)
assert(sizes(0) === 0)
assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name())
assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2")
assert(timestamps(1) === "10")
assert(sizes(1) === "20")
assert(timestamps(1) === 10)
assert(sizes(1) === 20)
assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name())
}
@ -165,18 +163,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
assert(resource.getSize() === 20)
assert(resource.getType() === LocalResourceType.ARCHIVE)
val env = new HashMap[String, String]()
distMgr.setDistFilesEnv(env)
assert(env.get("SPARK_YARN_CACHE_FILES") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
distMgr.setDistArchivesEnv(env)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
val sparkConf = new SparkConf(false)
distMgr.updateConfiguration(sparkConf)
assert(sparkConf.get(CACHED_FILES) === Nil)
assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Nil)
assert(sparkConf.get(CACHED_FILES_SIZES) === Nil)
assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Nil)
assert(sparkConf.get(CACHED_FILES_TYPES) === Nil)
}
test("test addResource archive") {
@ -199,20 +192,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
assert(resource.getSize() === 20)
assert(resource.getType() === LocalResourceType.ARCHIVE)
val env = new HashMap[String, String]()
distMgr.setDistArchivesEnv(env)
assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link")
assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10")
assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20")
assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
distMgr.setDistFilesEnv(env)
assert(env.get("SPARK_YARN_CACHE_FILES") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
val sparkConf = new SparkConf(false)
distMgr.updateConfiguration(sparkConf)
assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link"))
assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(20L))
assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(10L))
assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name()))
assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.ARCHIVE.name()))
}
}

View file

@ -243,9 +243,12 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
assert(sparkConf.get(SPARK_JARS) ===
Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*")))
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort())
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort())
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort())
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort(),
anyBoolean(), any())
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort(),
anyBoolean(), any())
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort(),
anyBoolean(), any())
val cp = classpath(client)
cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
@ -262,7 +265,8 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val client = createClient(sparkConf)
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort())
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort(),
anyBoolean(), any())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath())
@ -281,7 +285,8 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath()))
val client = createClient(sparkConf)
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort())
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort(),
anyBoolean(), any())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
}
@ -382,7 +387,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val clientArgs = new ClientArguments(args)
val client = spy(new Client(clientArgs, conf, sparkConf))
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
any(classOf[Path]), anyShort())
any(classOf[Path]), anyShort(), anyBoolean(), any())
client
}

View file

@ -104,7 +104,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
sparkConfClone,
rmClient,
appAttemptId,
new SecurityManager(sparkConf))
new SecurityManager(sparkConf),
Map())
}
def createContainer(host: String): Container = {