Allow spark on yarn to be run from HDFS. Allows the spark.jar, app.jar, and log4j.properties to be put into hdfs.

This commit is contained in:
tgravescs 2013-11-04 09:40:40 -06:00
parent b5dc3393a5
commit a35472e1dd
8 changed files with 656 additions and 175 deletions

View file

@ -21,6 +21,7 @@ The assembled JAR will be something like this:
# Preparations
- Building a YARN-enabled assembly (see above).
- The assembled jar can be installed into HDFS or used locally.
- Your application code must be packaged into a separate JAR file.
If you want to test out the YARN deployment mode, you can use the current Spark examples. A `spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}` file can be generated by running `sbt/sbt assembly`. NOTE: since the documentation you're reading is for Spark version {{site.SPARK_VERSION}}, we are assuming here that you have downloaded Spark {{site.SPARK_VERSION}} or checked it out of source control. If you are using a different version of Spark, the version numbers in the jar generated by the sbt package command will obviously be different.

View file

@ -385,6 +385,12 @@
<version>3.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.8.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.9.3</artifactId>

View file

@ -61,6 +61,16 @@
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.9.3</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
@ -106,6 +116,46 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<phase>test</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<exportAntProperties>true</exportAntProperties>
<tasks>
<property name="spark.classpath" refid="maven.test.classpath" />
<property environment="env" />
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
<condition>
<not>
<or>
<isset property="env.SCALA_HOME" />
<isset property="env.SCALA_LIBRARY_PATH" />
</or>
</not>
</condition>
</fail>
</tasks>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<environmentVariables>
<SPARK_HOME>${basedir}/..</SPARK_HOME>
<SPARK_TESTING>1</SPARK_TESTING>
<SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
</environmentVariables>
</configuration>
</plugin>
</plugins>
</build>
</project>

View file

@ -349,7 +349,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
try {
val preserveFiles = System.getProperty("spark.yarn.preserve.staging.files", "false").toBoolean
if (!preserveFiles) {
stagingDirPath = new Path(System.getenv("SPARK_YARN_JAR_PATH")).getParent()
stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
if (stagingDirPath == null) {
logError("Staging directory is null")
return

View file

@ -17,26 +17,31 @@
package org.apache.spark.deploy.yarn
import java.net.{InetSocketAddress, URI}
import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.client.YarnClientImpl
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, Records}
import scala.collection.mutable.HashMap
import scala.collection.mutable.Map
import scala.collection.JavaConversions._
import org.apache.spark.Logging
import org.apache.spark.util.Utils
import org.apache.hadoop.yarn.util.{Apps, Records, ConverterUtils}
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.spark.deploy.SparkHadoopUtil
class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
@ -46,13 +51,14 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
var rpc: YarnRPC = YarnRPC.create(conf)
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
private var distFiles = None: Option[String]
private var distFilesTimeStamps = None: Option[String]
private var distFilesFileSizes = None: Option[String]
private var distArchives = None: Option[String]
private var distArchivesTimeStamps = None: Option[String]
private var distArchivesFileSizes = None: Option[String]
private val SPARK_STAGING: String = ".sparkStaging"
private val distCacheMgr = new ClientDistributedCacheManager()
// staging directory is private! -> rwx--------
val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
// app files are world-wide readable and owner writable -> rw-r--r--
val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
def run() {
init(yarnConf)
start()
@ -63,8 +69,9 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
verifyClusterResources(newApp)
val appContext = createApplicationSubmissionContext(appId)
val localResources = prepareLocalResources(appId, ".sparkStaging")
val env = setupLaunchEnv(localResources)
val appStagingDir = getAppStagingDir(appId)
val localResources = prepareLocalResources(appStagingDir)
val env = setupLaunchEnv(localResources, appStagingDir)
val amContainer = createContainerLaunchContext(newApp, localResources, env)
appContext.setQueue(args.amQueue)
@ -76,7 +83,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
monitorApplication(appId)
System.exit(0)
}
def getAppStagingDir(appId: ApplicationId): String = {
SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
}
def logClusterResourceDetails() {
val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
@ -116,73 +126,73 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
return appContext
}
/**
* Copy the local file into HDFS and configure to be distributed with the
* job via the distributed cache.
* If a fragment is specified the file will be referenced as that fragment.
/*
* see if two file systems are the same or not.
*/
private def copyLocalFile(
dstDir: Path,
resourceType: LocalResourceType,
originalPath: Path,
replication: Short,
localResources: HashMap[String,LocalResource],
fragment: String,
appMasterOnly: Boolean = false): Unit = {
val fs = FileSystem.get(conf)
val newPath = new Path(dstDir, originalPath.getName())
logInfo("Uploading " + originalPath + " to " + newPath)
fs.copyFromLocalFile(false, true, originalPath, newPath)
fs.setReplication(newPath, replication);
val destStatus = fs.getFileStatus(newPath)
val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
amJarRsrc.setType(resourceType)
amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(newPath))
amJarRsrc.setTimestamp(destStatus.getModificationTime())
amJarRsrc.setSize(destStatus.getLen())
var pathURI: URI = new URI(newPath.toString() + "#" + originalPath.getName());
if ((fragment == null) || (fragment.isEmpty())){
localResources(originalPath.getName()) = amJarRsrc
} else {
localResources(fragment) = amJarRsrc
pathURI = new URI(newPath.toString() + "#" + fragment);
private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
val srcUri = srcFs.getUri()
val dstUri = destFs.getUri()
if (srcUri.getScheme() == null) {
return false
}
val distPath = pathURI.toString()
if (appMasterOnly == true) return
if (resourceType == LocalResourceType.FILE) {
distFiles match {
case Some(path) =>
distFilesFileSizes = Some(distFilesFileSizes.get + "," +
destStatus.getLen().toString())
distFilesTimeStamps = Some(distFilesTimeStamps.get + "," +
destStatus.getModificationTime().toString())
distFiles = Some(path + "," + distPath)
case _ =>
distFilesFileSizes = Some(destStatus.getLen().toString())
distFilesTimeStamps = Some(destStatus.getModificationTime().toString())
distFiles = Some(distPath)
}
} else {
distArchives match {
case Some(path) =>
distArchivesTimeStamps = Some(distArchivesTimeStamps.get + "," +
destStatus.getModificationTime().toString())
distArchivesFileSizes = Some(distArchivesFileSizes.get + "," +
destStatus.getLen().toString())
distArchives = Some(path + "," + distPath)
case _ =>
distArchivesTimeStamps = Some(destStatus.getModificationTime().toString())
distArchivesFileSizes = Some(destStatus.getLen().toString())
distArchives = Some(distPath)
}
if (!srcUri.getScheme().equals(dstUri.getScheme())) {
return false
}
var srcHost = srcUri.getHost()
var dstHost = dstUri.getHost()
if ((srcHost != null) && (dstHost != null)) {
try {
srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
} catch {
case e: UnknownHostException =>
return false
}
if (!srcHost.equals(dstHost)) {
return false
}
} else if (srcHost == null && dstHost != null) {
return false
} else if (srcHost != null && dstHost == null) {
return false
}
//check for ports
if (srcUri.getPort() != dstUri.getPort()) {
return false
}
return true;
}
def prepareLocalResources(appId: ApplicationId, sparkStagingDir: String): HashMap[String, LocalResource] = {
/**
* Copy the file into HDFS if needed.
*/
private def copyRemoteFile(
dstDir: Path,
originalPath: Path,
replication: Short,
setPerms: Boolean = false): Path = {
val fs = FileSystem.get(conf)
val remoteFs = originalPath.getFileSystem(conf);
var newPath = originalPath
if (! compareFs(remoteFs, fs)) {
newPath = new Path(dstDir, originalPath.getName())
logInfo("Uploading " + originalPath + " to " + newPath)
FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf);
fs.setReplication(newPath, replication);
if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
}
// resolve any symlinks in the URI path so using a "current" symlink
// to point to a specific version shows the specific version
// in the distributed cache configuration
val qualPath = fs.makeQualified(newPath)
val fc = FileContext.getFileContext(qualPath.toUri(), conf)
val destPath = fc.resolvePath(qualPath)
destPath
}
def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
logInfo("Preparing Local resources")
// Upload Spark and the application JAR to the remote file system
// Upload Spark and the application JAR to the remote file system if necessary
// Add them as local resources to the AM
val fs = FileSystem.get(conf)
@ -193,9 +203,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
System.exit(1)
}
}
val pathSuffix = sparkStagingDir + "/" + appId.toString() + "/"
val dst = new Path(fs.getHomeDirectory(), pathSuffix)
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
val replication = System.getProperty("spark.yarn.submit.file.replication", "3").toShort
if (UserGroupInformation.isSecurityEnabled()) {
@ -203,55 +211,65 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
dstFs.addDelegationTokens(delegTokenRenewer, credentials);
}
val localResources = HashMap[String, LocalResource]()
FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
Map("spark.jar" -> System.getenv("SPARK_JAR"), "app.jar" -> args.userJar, "log4j.properties" -> System.getenv("SPARK_LOG4J_CONF"))
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
if (System.getenv("SPARK_JAR") == null || args.userJar == null) {
logError("Error: You must set SPARK_JAR environment variable and specify a user jar!")
System.exit(1)
}
Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
.foreach { case(destName, _localPath) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
if (! localPath.isEmpty()) {
val src = new Path(localPath)
val newPath = new Path(dst, destName)
logInfo("Uploading " + src + " to " + newPath)
fs.copyFromLocalFile(false, true, src, newPath)
fs.setReplication(newPath, replication);
val destStatus = fs.getFileStatus(newPath)
val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
amJarRsrc.setType(LocalResourceType.FILE)
amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(newPath))
amJarRsrc.setTimestamp(destStatus.getModificationTime())
amJarRsrc.setSize(destStatus.getLen())
localResources(destName) = amJarRsrc
var localURI = new URI(localPath)
// if not specified assume these are in the local filesystem to keep behavior like Hadoop
if (localURI.getScheme() == null) {
localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString())
}
val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
destName, statCache)
}
}
// handle any add jars
if ((args.addJars != null) && (!args.addJars.isEmpty())){
args.addJars.split(',').foreach { case file: String =>
val tmpURI = new URI(file)
val tmp = new Path(tmpURI)
copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources,
tmpURI.getFragment(), true)
val localURI = new URI(file.trim())
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
linkname, statCache, true)
}
}
// handle any distributed cache files
if ((args.files != null) && (!args.files.isEmpty())){
args.files.split(',').foreach { case file: String =>
val tmpURI = new URI(file)
val tmp = new Path(tmpURI)
copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources,
tmpURI.getFragment())
val localURI = new URI(file.trim())
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
linkname, statCache)
}
}
// handle any distributed cache archives
if ((args.archives != null) && (!args.archives.isEmpty())) {
args.archives.split(',').foreach { case file:String =>
val tmpURI = new URI(file)
val tmp = new Path(tmpURI)
copyLocalFile(dst, LocalResourceType.ARCHIVE, tmp, replication,
localResources, tmpURI.getFragment())
val localURI = new URI(file.trim())
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
linkname, statCache)
}
}
@ -259,44 +277,21 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
return localResources
}
def setupLaunchEnv(localResources: HashMap[String, LocalResource]): HashMap[String, String] = {
def setupLaunchEnv(
localResources: HashMap[String, LocalResource],
stagingDir: String): HashMap[String, String] = {
logInfo("Setting up the launch environment")
val log4jConfLocalRes = localResources.getOrElse("log4j.properties", null)
val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null)
val env = new HashMap[String, String]()
Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_JAR_PATH") =
localResources("spark.jar").getResource().getScheme.toString() + "://" +
localResources("spark.jar").getResource().getFile().toString()
env("SPARK_YARN_JAR_TIMESTAMP") = localResources("spark.jar").getTimestamp().toString()
env("SPARK_YARN_JAR_SIZE") = localResources("spark.jar").getSize().toString()
env("SPARK_YARN_USERJAR_PATH") =
localResources("app.jar").getResource().getScheme.toString() + "://" +
localResources("app.jar").getResource().getFile().toString()
env("SPARK_YARN_USERJAR_TIMESTAMP") = localResources("app.jar").getTimestamp().toString()
env("SPARK_YARN_USERJAR_SIZE") = localResources("app.jar").getSize().toString()
if (log4jConfLocalRes != null) {
env("SPARK_YARN_LOG4J_PATH") =
log4jConfLocalRes.getResource().getScheme.toString() + "://" + log4jConfLocalRes.getResource().getFile().toString()
env("SPARK_YARN_LOG4J_TIMESTAMP") = log4jConfLocalRes.getTimestamp().toString()
env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString()
}
env("SPARK_YARN_STAGING_DIR") = stagingDir
// set the environment variables to be passed on to the Workers
if (distFiles != None) {
env("SPARK_YARN_CACHE_FILES") = distFiles.get
env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = distFilesTimeStamps.get
env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = distFilesFileSizes.get
}
if (distArchives != None) {
env("SPARK_YARN_CACHE_ARCHIVES") = distArchives.get
env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = distArchivesTimeStamps.get
env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") = distArchivesFileSizes.get
}
distCacheMgr.setDistFilesEnv(env)
distCacheMgr.setDistArchivesEnv(env)
// allow users to specify some environment variables
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
@ -365,6 +360,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
}
if (args.userClass == null) {
logError("Error: You must specify a user class!")
System.exit(1)
}
val commands = List[String](javaCommand +
" -server " +
JAVA_OPTS +
@ -432,6 +432,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
object Client {
val SPARK_JAR: String = "spark.jar"
val APP_JAR: String = "app.jar"
val LOG4J_PROP: String = "log4j.properties"
def main(argStrings: Array[String]) {
// Set an env variable indicating we are running in YARN mode.
// Note that anything with SPARK prefix gets propagated to all (remote) processes
@ -453,22 +457,22 @@ object Client {
// If log4j present, ensure ours overrides all others
if (addLog4j) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "log4j.properties")
Path.SEPARATOR + LOG4J_PROP)
}
// normally the users app.jar is last in case conflicts with spark jars
val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "app.jar")
Path.SEPARATOR + APP_JAR)
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "spark.jar")
Path.SEPARATOR + SPARK_JAR)
Client.populateHadoopClasspath(conf, env)
if (!userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "app.jar")
Path.SEPARATOR + APP_JAR)
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "*")

View file

@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn
import java.net.URI;
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.yarn.api.records.LocalResource
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
import org.apache.hadoop.yarn.api.records.LocalResourceType
import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
import org.apache.spark.Logging
import scala.collection.mutable.HashMap
import scala.collection.mutable.LinkedHashMap
import scala.collection.mutable.Map
/** Client side methods to setup the Hadoop distributed cache */
class ClientDistributedCacheManager() extends Logging {
private val distCacheFiles: Map[String, Tuple3[String, String, String]] =
LinkedHashMap[String, Tuple3[String, String, String]]()
private val distCacheArchives: Map[String, Tuple3[String, String, String]] =
LinkedHashMap[String, Tuple3[String, String, String]]()
/**
* Add a resource to the list of distributed cache resources. This list can
* be sent to the ApplicationMaster and possibly the workers so that it can
* be downloaded into the Hadoop distributed cache for use by this application.
* Adds the LocalResource to the localResources HashMap passed in and saves
* the stats of the resources to they can be sent to the workers and verified.
*
* @param fs FileSystem
* @param conf Configuration
* @param destPath path to the resource
* @param localResources localResource hashMap to insert the resource into
* @param resourceType LocalResourceType
* @param link link presented in the distributed cache to the destination
* @param statCache cache to store the file/directory stats
* @param appMasterOnly Whether to only add the resource to the app master
*/
def addResource(
fs: FileSystem,
conf: Configuration,
destPath: Path,
localResources: HashMap[String, LocalResource],
resourceType: LocalResourceType,
link: String,
statCache: Map[URI, FileStatus],
appMasterOnly: Boolean = false) = {
val destStatus = fs.getFileStatus(destPath)
val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
amJarRsrc.setType(resourceType)
val visibility = getVisibility(conf, destPath.toUri(), statCache)
amJarRsrc.setVisibility(visibility)
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath))
amJarRsrc.setTimestamp(destStatus.getModificationTime())
amJarRsrc.setSize(destStatus.getLen())
if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name")
localResources(link) = amJarRsrc
if (appMasterOnly == false) {
val uri = destPath.toUri()
val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
if (resourceType == LocalResourceType.FILE) {
distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(),
destStatus.getModificationTime().toString(), visibility.name())
} else {
distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(),
destStatus.getModificationTime().toString(), visibility.name())
}
}
}
/**
* Adds the necessary cache file env variables to the env passed in
* @param env
*/
def setDistFilesEnv(env: Map[String, String]) = {
val (keys, tupleValues) = distCacheFiles.unzip
val (sizes, timeStamps, visibilities) = tupleValues.unzip3
if (keys.size > 0) {
env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") =
timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
env("SPARK_YARN_CACHE_FILES_FILE_SIZES") =
sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
env("SPARK_YARN_CACHE_FILES_VISIBILITIES") =
visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
}
}
/**
* Adds the necessary cache archive env variables to the env passed in
* @param env
*/
def setDistArchivesEnv(env: Map[String, String]) = {
val (keys, tupleValues) = distCacheArchives.unzip
val (sizes, timeStamps, visibilities) = tupleValues.unzip3
if (keys.size > 0) {
env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") =
timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") =
visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
}
}
/**
* Returns the local resource visibility depending on the cache file permissions
* @param conf
* @param uri
* @param statCache
* @return LocalResourceVisibility
*/
def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
LocalResourceVisibility = {
if (isPublic(conf, uri, statCache)) {
return LocalResourceVisibility.PUBLIC
}
return LocalResourceVisibility.PRIVATE
}
/**
* Returns a boolean to denote whether a cache file is visible to all(public)
* or not
* @param conf
* @param uri
* @param statCache
* @return true if the path in the uri is visible to all, false otherwise
*/
def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
val fs = FileSystem.get(uri, conf)
val current = new Path(uri.getPath())
//the leaf level file should be readable by others
if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
return false
}
return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
}
/**
* Returns true if all ancestors of the specified path have the 'execute'
* permission set for all users (i.e. that other users can traverse
* the directory heirarchy to the given path)
* @param fs
* @param path
* @param statCache
* @return true if all ancestors have the 'execute' permission set for all users
*/
def ancestorsHaveExecutePermissions(fs: FileSystem, path: Path,
statCache: Map[URI, FileStatus]): Boolean = {
var current = path
while (current != null) {
//the subdirs in the path should have execute permissions for others
if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
return false
}
current = current.getParent()
}
return true
}
/**
* Checks for a given path whether the Other permissions on it
* imply the permission in the passed FsAction
* @param fs
* @param path
* @param action
* @param statCache
* @return true if the path in the uri is visible to all, false otherwise
*/
def checkPermissionOfOther(fs: FileSystem, path: Path,
action: FsAction, statCache: Map[URI, FileStatus]): Boolean = {
val status = getFileStatus(fs, path.toUri(), statCache);
val perms = status.getPermission()
val otherAction = perms.getOtherAction()
if (otherAction.implies(action)) {
return true;
}
return false
}
/**
* Checks to see if the given uri exists in the cache, if it does it
* returns the existing FileStatus, otherwise it stats the uri, stores
* it in the cache, and returns the FileStatus.
* @param fs
* @param uri
* @param statCache
* @return FileStatus
*/
def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = {
val stat = statCache.get(uri) match {
case Some(existstat) => existstat
case None =>
val newStat = fs.getFileStatus(new Path(uri))
statCache.put(uri, newStat)
newStat
}
return stat
}
}

View file

@ -142,11 +142,12 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
rtype: LocalResourceType,
localResources: HashMap[String, LocalResource],
timestamp: String,
size: String) = {
size: String,
vis: String) = {
val uri = new URI(file)
val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
amJarRsrc.setType(rtype)
amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
amJarRsrc.setTimestamp(timestamp.toLong)
amJarRsrc.setSize(size.toLong)
@ -158,44 +159,14 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
logInfo("Preparing Local resources")
val localResources = HashMap[String, LocalResource]()
// Spark JAR
val sparkJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
sparkJarResource.setType(LocalResourceType.FILE)
sparkJarResource.setVisibility(LocalResourceVisibility.APPLICATION)
sparkJarResource.setResource(ConverterUtils.getYarnUrlFromURI(
new URI(System.getenv("SPARK_YARN_JAR_PATH"))))
sparkJarResource.setTimestamp(System.getenv("SPARK_YARN_JAR_TIMESTAMP").toLong)
sparkJarResource.setSize(System.getenv("SPARK_YARN_JAR_SIZE").toLong)
localResources("spark.jar") = sparkJarResource
// User JAR
val userJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
userJarResource.setType(LocalResourceType.FILE)
userJarResource.setVisibility(LocalResourceVisibility.APPLICATION)
userJarResource.setResource(ConverterUtils.getYarnUrlFromURI(
new URI(System.getenv("SPARK_YARN_USERJAR_PATH"))))
userJarResource.setTimestamp(System.getenv("SPARK_YARN_USERJAR_TIMESTAMP").toLong)
userJarResource.setSize(System.getenv("SPARK_YARN_USERJAR_SIZE").toLong)
localResources("app.jar") = userJarResource
// Log4j conf - if available
if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
val log4jConfResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
log4jConfResource.setType(LocalResourceType.FILE)
log4jConfResource.setVisibility(LocalResourceVisibility.APPLICATION)
log4jConfResource.setResource(ConverterUtils.getYarnUrlFromURI(
new URI(System.getenv("SPARK_YARN_LOG4J_PATH"))))
log4jConfResource.setTimestamp(System.getenv("SPARK_YARN_LOG4J_TIMESTAMP").toLong)
log4jConfResource.setSize(System.getenv("SPARK_YARN_LOG4J_SIZE").toLong)
localResources("log4j.properties") = log4jConfResource
}
if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
for( i <- 0 to distFiles.length - 1) {
setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
fileSizes(i))
fileSizes(i), visibilities(i))
}
}
@ -203,9 +174,10 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
for( i <- 0 to distArchives.length - 1) {
setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
timeStamps(i), fileSizes(i))
timeStamps(i), fileSizes(i), visibilities(i))
}
}

View file

@ -0,0 +1,220 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn
import java.net.URI;
import org.scalatest.FunSuite
import org.scalatest.mock.MockitoSugar
import org.mockito.Mockito.when
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.yarn.api.records.LocalResource
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
import org.apache.hadoop.yarn.api.records.LocalResourceType
import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
import scala.collection.mutable.HashMap
import scala.collection.mutable.Map
class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar {
class MockClientDistributedCacheManager extends ClientDistributedCacheManager {
override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
LocalResourceVisibility = {
return LocalResourceVisibility.PRIVATE
}
}
test("test getFileStatus empty") {
val distMgr = new ClientDistributedCacheManager()
val fs = mock[FileSystem]
val uri = new URI("/tmp/testing")
when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
val stat = distMgr.getFileStatus(fs, uri, statCache)
assert(stat.getPath() === null)
}
test("test getFileStatus cached") {
val distMgr = new ClientDistributedCacheManager()
val fs = mock[FileSystem]
val uri = new URI("/tmp/testing")
val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner",
null, new Path("/tmp/testing"))
when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus)
val stat = distMgr.getFileStatus(fs, uri, statCache)
assert(stat.getPath().toString() === "/tmp/testing")
}
test("test addResource") {
val distMgr = new MockClientDistributedCacheManager()
val fs = mock[FileSystem]
val conf = new Configuration()
val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
val localResources = HashMap[String, LocalResource]()
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link",
statCache, false)
val resource = localResources("link")
assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
assert(resource.getTimestamp() === 0)
assert(resource.getSize() === 0)
assert(resource.getType() === LocalResourceType.FILE)
val env = new HashMap[String, String]()
distMgr.setDistFilesEnv(env)
assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link")
assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0")
assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0")
assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
distMgr.setDistArchivesEnv(env)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
//add another one and verify both there and order correct
val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
null, new Path("/tmp/testing2"))
val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")
when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus)
distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2",
statCache, false)
val resource2 = localResources("link2")
assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE)
assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === destPath2)
assert(resource2.getTimestamp() === 10)
assert(resource2.getSize() === 20)
assert(resource2.getType() === LocalResourceType.FILE)
val env2 = new HashMap[String, String]()
distMgr.setDistFilesEnv(env2)
val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
val files = env2("SPARK_YARN_CACHE_FILES").split(',')
val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',')
assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
assert(timestamps(0) === "0")
assert(sizes(0) === "0")
assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name())
assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2")
assert(timestamps(1) === "10")
assert(sizes(1) === "20")
assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name())
}
test("test addResource link null") {
val distMgr = new MockClientDistributedCacheManager()
val fs = mock[FileSystem]
val conf = new Configuration()
val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
val localResources = HashMap[String, LocalResource]()
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
intercept[Exception] {
distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null,
statCache, false)
}
assert(localResources.get("link") === None)
assert(localResources.size === 0)
}
test("test addResource appmaster only") {
val distMgr = new MockClientDistributedCacheManager()
val fs = mock[FileSystem]
val conf = new Configuration()
val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
val localResources = HashMap[String, LocalResource]()
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
null, new Path("/tmp/testing"))
when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
statCache, true)
val resource = localResources("link")
assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
assert(resource.getTimestamp() === 10)
assert(resource.getSize() === 20)
assert(resource.getType() === LocalResourceType.ARCHIVE)
val env = new HashMap[String, String]()
distMgr.setDistFilesEnv(env)
assert(env.get("SPARK_YARN_CACHE_FILES") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
distMgr.setDistArchivesEnv(env)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
}
test("test addResource archive") {
val distMgr = new MockClientDistributedCacheManager()
val fs = mock[FileSystem]
val conf = new Configuration()
val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
val localResources = HashMap[String, LocalResource]()
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
null, new Path("/tmp/testing"))
when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
statCache, false)
val resource = localResources("link")
assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
assert(resource.getTimestamp() === 10)
assert(resource.getSize() === 20)
assert(resource.getType() === LocalResourceType.ARCHIVE)
val env = new HashMap[String, String]()
distMgr.setDistArchivesEnv(env)
assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link")
assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10")
assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20")
assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
distMgr.setDistFilesEnv(env)
assert(env.get("SPARK_YARN_CACHE_FILES") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
}
}