merge upstream/master

This commit is contained in:
liguoqiang 2014-01-03 16:06:34 +08:00
commit 8ddbd531a4
36 changed files with 198 additions and 1236 deletions

View file

@ -124,7 +124,17 @@
<profiles>
<profile>
<id>hadoop2-yarn</id>
<id>yarn-alpha</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn-alpha_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>yarn</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>

View file

@ -37,20 +37,16 @@ For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop versions wit
# Cloudera CDH 4.2.0 with MapReduce v1
$ mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests clean package
For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, you should enable the "hadoop2-yarn" profile and set the "yarn.version" property:
For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, you should enable the "yarn-alpha" or "yarn" profile and set the "hadoop.version", "yarn.version" property:
# Apache Hadoop 2.0.5-alpha
$ mvn -Phadoop2-yarn -Dhadoop.version=2.0.5-alpha -Dyarn.version=2.0.5-alpha -DskipTests clean package
$ mvn -Pyarn-alpha -Dhadoop.version=2.0.5-alpha -Dyarn.version=2.0.5-alpha -DskipTests clean package
# Cloudera CDH 4.2.0 with MapReduce v2
$ mvn -Phadoop2-yarn -Dhadoop.version=2.0.0-cdh4.2.0 -Dyarn.version=2.0.0-chd4.2.0 -DskipTests clean package
$ mvn -Pyarn-alpha -Dhadoop.version=2.0.0-cdh4.2.0 -Dyarn.version=2.0.0-chd4.2.0 -DskipTests clean package
Hadoop versions 2.2.x and newer can be built by setting the ```new-yarn``` and the ```yarn.version``` as follows:
# Apache Hadoop 2.2.X and newer
$ mvn -Dyarn.version=2.2.0 -Dhadoop.version=2.2.0 -Pnew-yarn
The build process handles Hadoop 2.2.x as a special case that uses the directory ```new-yarn```, which supports the new YARN API. Furthermore, for this version, the build depends on artifacts published by the spark-project to enable Akka 2.0.5 to work with protobuf 2.5.
# Apache Hadoop 2.2.X ( e.g. 2.2.0 as below ) and newer
$ mvn -Pyarn -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -DskipTests clean package
## Spark Tests in Maven ##

View file

@ -116,8 +116,6 @@ For example:
# Building Spark for Hadoop/YARN 2.2.x
Hadoop 2.2.x users must build Spark and publish it locally. The SBT build process handles Hadoop 2.2.x as a special case. This version of Hadoop has new YARN API changes and depends on a Protobuf version (2.5) that is not compatible with the Akka version (2.0.5) that Spark uses. Therefore, if the Hadoop version (e.g. set through ```SPARK_HADOOP_VERSION```) starts with 2.2.0 or higher then the build process will depend on Akka artifacts distributed by the Spark project compatible with Protobuf 2.5. Furthermore, the build process then uses the directory ```new-yarn``` (instead of ```yarn```), which supports the new YARN API. The build process should seamlessly work out of the box.
See [Building Spark with Maven](building-with-maven.html) for instructions on how to build Spark using the Maven process.
# Important Notes
@ -126,4 +124,3 @@ See [Building Spark with Maven](building-with-maven.html) for instructions on ho
- The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.
- The --files and --archives options support specifying file names with the # similar to Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt and your application should use the name as appSees.txt to reference it when running on YARN.
- The --addJars option allows the SparkContext.addJar function to work if you are using it with local files. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files.
- YARN 2.2.x users cannot simply depend on the Spark packages without building Spark, as the published Spark artifacts are compiled to work with the pre 2.2 API. Those users must build Spark and publish it locally.

View file

@ -1,161 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project YARN Support</name>
<url>http://spark.incubator.apache.org/</url>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${yarn.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.10</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<phase>test</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<exportAntProperties>true</exportAntProperties>
<tasks>
<property name="spark.classpath" refid="maven.test.classpath" />
<property environment="env" />
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
<condition>
<not>
<or>
<isset property="env.SCALA_HOME" />
<isset property="env.SCALA_LIBRARY_PATH" />
</or>
</not>
</condition>
</fail>
</tasks>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<environmentVariables>
<SPARK_HOME>${basedir}/..</SPARK_HOME>
<SPARK_TESTING>1</SPARK_TESTING>
<SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
</environmentVariables>
</configuration>
</plugin>
</plugins>
</build>
</project>

59
pom.xml
View file

@ -722,7 +722,7 @@
<profiles>
<profile>
<id>hadoop2-yarn</id>
<id>yarn-alpha</id>
<properties>
<hadoop.major.version>2</hadoop.major.version>
<!-- 0.23.* is same as 2.0.* - except hardened to run production jobs -->
@ -735,57 +735,20 @@
<module>yarn</module>
</modules>
<repositories>
<repository>
<id>maven-root</id>
<name>Maven root repository</name>
<url>http://repo1.maven.org/maven2</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<dependencyManagement>
<dependencies>
</dependencies>
</dependencyManagement>
</profile>
<profile>
<id>new-yarn</id>
<properties>
<hadoop.major.version>2</hadoop.major.version>
<hadoop.version>2.2.0</hadoop.version>
<protobuf.version>2.5.0</protobuf.version>
</properties>
<id>yarn</id>
<properties>
<hadoop.major.version>2</hadoop.major.version>
<hadoop.version>2.2.0</hadoop.version>
<protobuf.version>2.5.0</protobuf.version>
</properties>
<modules>
<module>yarn</module>
</modules>
<modules>
<module>new-yarn</module>
</modules>
<repositories>
<repository>
<id>maven-root</id>
<name>Maven root repository</name>
<url>http://repo1.maven.org/maven2</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<dependencyManagement>
<dependencies>
</dependencies>
</dependencyManagement>
</profile>
</profile>
<profile>
<id>repl-bin</id>

View file

@ -85,12 +85,11 @@ object SparkBuild extends Build {
}
// Conditionally include the yarn sub-project
lazy val yarn = Project("yarn", file(if (isNewHadoop) "new-yarn" else "yarn"), settings = yarnSettings) dependsOn(core)
lazy val yarnAlpha = Project("yarn-alpha", file("yarn/alpha"), settings = yarnAlphaSettings) dependsOn(core)
lazy val yarn = Project("yarn", file("yarn/stable"), settings = yarnSettings) dependsOn(core)
//lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core)
lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](if (isNewHadoop) yarn else yarnAlpha) else Seq[ClasspathDependency]()
lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](if (isNewHadoop) yarn else yarnAlpha) else Seq[ProjectReference]()
// Everything except assembly, tools and examples belong to packageProjects
lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib) ++ maybeYarnRef
@ -320,10 +319,29 @@ object SparkBuild extends Build {
)
)
def yarnSettings = sharedSettings ++ Seq(
name := "spark-yarn"
def yarnCommonSettings = sharedSettings ++ Seq(
unmanagedSourceDirectories in Compile <++= baseDirectory { base =>
Seq(
base / "../common/src/main/scala"
)
},
unmanagedSourceDirectories in Test <++= baseDirectory { base =>
Seq(
base / "../common/src/test/scala"
)
}
) ++ extraYarnSettings
def yarnAlphaSettings = yarnCommonSettings ++ Seq(
name := "spark-yarn-alpha"
)
def yarnSettings = yarnCommonSettings ++ Seq(
name := "spark-yarn"
)
// Conditionally include the YARN dependencies because some tools look at all sub-projects and will complain
// if we refer to nonexistent dependencies (e.g. hadoop-yarn-api from a Hadoop version without YARN).
def extraYarnSettings = if(isYarnEnabled) yarnEnabledSettings else Seq()

12
yarn/README.md Normal file
View file

@ -0,0 +1,12 @@
# YARN DIRECTORY LAYOUT
Hadoop Yarn related codes are organized in separate directories to minimize duplicated code.
* common : Common codes that do not depending on specific version of Hadoop.
* alpha / stable : Codes that involve specific version of Hadoop YARN API.
alpha represents 0.23 and 2.0.x
stable represents 2.2 and later, until the API changes again.
alpha / stable will build together with common dir into a single jar

32
yarn/alpha/pom.xml Normal file
View file

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>yarn-parent_2.10</artifactId>
<version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn-alpha_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project YARN Alpha API</name>
</project>

View file

@ -37,33 +37,33 @@ import org.apache.spark.scheduler.SplitInfo
class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
extends Logging {
def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
this(args, new Configuration(), sparkConf)
def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
private val rpc: YarnRPC = YarnRPC.create(conf)
private var resourceManager: AMRMProtocol = null
private var appAttemptId: ApplicationAttemptId = null
private var reporterThread: Thread = null
private var resourceManager: AMRMProtocol = _
private var appAttemptId: ApplicationAttemptId = _
private var reporterThread: Thread = _
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var yarnAllocator: YarnAllocationHandler = null
private var driverClosed: Boolean = false
private var yarnAllocator: YarnAllocationHandler = _
private var driverClosed:Boolean = false
val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf)._1
var actor: ActorRef = null
var actor: ActorRef = _
// This actor just working as a monitor to watch on Driver Actor.
class MonitorActor(driverUrl: String) extends Actor {
var driver: ActorSelection = null
var driver: ActorSelection = _
override def preStart() {
logInfo("Listen to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
driver ! "hello"
// Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
driver ! "Hello"
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
@ -85,7 +85,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
if (minimumMemory > 0) {
val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
if (numCore > 0) {
// do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
@ -106,7 +106,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
// must be <= timeoutInterval/ 2.
// On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
// so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L))
val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
reporterThread = launchReporterThread(interval)
// Wait for the reporter thread to Finish.
@ -167,8 +167,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
Thread.sleep(100)
}
}
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
@ -190,7 +190,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
while (yarnAllocator.getNumWorkersRunning < args.numWorkers) {
while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
Thread.sleep(100)
}
@ -201,7 +201,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
// TODO: We might want to extend this to allocate more containers in case they die !
private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
val t = new Thread {
override def run() {

View file

@ -52,7 +52,7 @@ class WorkerRunnable(
extends Runnable with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
var cm: ContainerManager = null
var cm: ContainerManager = _
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
def run = {

View file

@ -25,11 +25,10 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project YARN Support</name>
<url>http://spark.incubator.apache.org/</url>
<artifactId>yarn-parent_2.10</artifactId>
<packaging>pom</packaging>
<name>Spark Project YARN Parent POM</name>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
@ -73,45 +72,52 @@
</dependency>
</dependencies>
<profiles>
<profile>
<id>yarn-alpha</id>
<modules>
<module>alpha</module>
</modules>
</profile>
<profile>
<id>yarn</id>
<modules>
<module>stable</module>
</modules>
</profile>
</profiles>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<id>add-scala-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>shade</goal>
<goal>add-source</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
<sources>
<source>src/main/scala</source>
<source>../common/src/main/scala</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-scala-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
<source>../common/src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
@ -150,12 +156,16 @@
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<environmentVariables>
<SPARK_HOME>${basedir}/..</SPARK_HOME>
<SPARK_HOME>${basedir}/../..</SPARK_HOME>
<SPARK_TESTING>1</SPARK_TESTING>
<SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
</environmentVariables>
</configuration>
</plugin>
</plugins>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>

View file

@ -1,94 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn
import org.apache.spark.util.IntParam
import collection.mutable.ArrayBuffer
class ApplicationMasterArguments(val args: Array[String]) {
var userJar: String = null
var userClass: String = null
var userArgs: Seq[String] = Seq[String]()
var workerMemory = 1024
var workerCores = 1
var numWorkers = 2
parseArgs(args.toList)
private def parseArgs(inputArgs: List[String]): Unit = {
val userArgsBuffer = new ArrayBuffer[String]()
var args = inputArgs
while (! args.isEmpty) {
args match {
case ("--jar") :: value :: tail =>
userJar = value
args = tail
case ("--class") :: value :: tail =>
userClass = value
args = tail
case ("--args") :: value :: tail =>
userArgsBuffer += value
args = tail
case ("--num-workers") :: IntParam(value) :: tail =>
numWorkers = value
args = tail
case ("--worker-memory") :: IntParam(value) :: tail =>
workerMemory = value
args = tail
case ("--worker-cores") :: IntParam(value) :: tail =>
workerCores = value
args = tail
case Nil =>
if (userJar == null || userClass == null) {
printUsageAndExit(1)
}
case _ =>
printUsageAndExit(1, args)
}
}
userArgs = userArgsBuffer.readOnly
}
def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
if (unknownParam != null) {
System.err.println("Unknown/unsupported param " + unknownParam)
}
System.err.println(
"Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] \n" +
"Options:\n" +
" --jar JAR_PATH Path to your application's JAR file (required)\n" +
" --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" +
" --num-workers NUM Number of workers to start (Default: 2)\n" +
" --worker-cores NUM Number of cores for the workers (Default: 1)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n")
System.exit(exitCode)
}
}

View file

@ -1,147 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn
import org.apache.spark.SparkConf
import org.apache.spark.util.MemoryParam
import org.apache.spark.util.IntParam
import collection.mutable.{ArrayBuffer, HashMap}
import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
// TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
var addJars: String = null
var files: String = null
var archives: String = null
var userJar: String = null
var userClass: String = null
var userArgs: Seq[String] = Seq[String]()
var workerMemory = 1024
var workerCores = 1
var numWorkers = 2
var amQueue = sparkConf.get("QUEUE", "default")
var amMemory: Int = 512
var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
var appName: String = "Spark"
// TODO
var inputFormatInfo: List[InputFormatInfo] = null
parseArgs(args.toList)
private def parseArgs(inputArgs: List[String]): Unit = {
val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()
val inputFormatMap: HashMap[String, InputFormatInfo] = new HashMap[String, InputFormatInfo]()
var args = inputArgs
while (! args.isEmpty) {
args match {
case ("--jar") :: value :: tail =>
userJar = value
args = tail
case ("--class") :: value :: tail =>
userClass = value
args = tail
case ("--args") :: value :: tail =>
userArgsBuffer += value
args = tail
case ("--master-class") :: value :: tail =>
amClass = value
args = tail
case ("--master-memory") :: MemoryParam(value) :: tail =>
amMemory = value
args = tail
case ("--worker-memory") :: MemoryParam(value) :: tail =>
workerMemory = value
args = tail
case ("--num-workers") :: IntParam(value) :: tail =>
numWorkers = value
args = tail
case ("--worker-cores") :: IntParam(value) :: tail =>
workerCores = value
args = tail
case ("--queue") :: value :: tail =>
amQueue = value
args = tail
case ("--name") :: value :: tail =>
appName = value
args = tail
case ("--addJars") :: value :: tail =>
addJars = value
args = tail
case ("--files") :: value :: tail =>
files = value
args = tail
case ("--archives") :: value :: tail =>
archives = value
args = tail
case Nil =>
if (userJar == null || userClass == null) {
printUsageAndExit(1)
}
case _ =>
printUsageAndExit(1, args)
}
}
userArgs = userArgsBuffer.readOnly
inputFormatInfo = inputFormatMap.values.toList
}
def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
if (unknownParam != null) {
System.err.println("Unknown/unsupported param " + unknownParam)
}
System.err.println(
"Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
"Options:\n" +
" --jar JAR_PATH Path to your application's JAR file (required)\n" +
" --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" +
" --num-workers NUM Number of workers to start (Default: 2)\n" +
" --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
" --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" +
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
" --name NAME The name of your application (Default: Spark)\n" +
" --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
" --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
" --files files Comma separated list of files to be distributed with the job.\n" +
" --archives archives Comma separated list of archives to be distributed with the job."
)
System.exit(exitCode)
}
}

View file

@ -1,228 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.yarn.api.records.LocalResource
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
import org.apache.hadoop.yarn.api.records.LocalResourceType
import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
import org.apache.spark.Logging
import scala.collection.mutable.HashMap
import scala.collection.mutable.LinkedHashMap
import scala.collection.mutable.Map
/** Client side methods to setup the Hadoop distributed cache */
class ClientDistributedCacheManager() extends Logging {
private val distCacheFiles: Map[String, Tuple3[String, String, String]] =
LinkedHashMap[String, Tuple3[String, String, String]]()
private val distCacheArchives: Map[String, Tuple3[String, String, String]] =
LinkedHashMap[String, Tuple3[String, String, String]]()
/**
* Add a resource to the list of distributed cache resources. This list can
* be sent to the ApplicationMaster and possibly the workers so that it can
* be downloaded into the Hadoop distributed cache for use by this application.
* Adds the LocalResource to the localResources HashMap passed in and saves
* the stats of the resources to they can be sent to the workers and verified.
*
* @param fs FileSystem
* @param conf Configuration
* @param destPath path to the resource
* @param localResources localResource hashMap to insert the resource into
* @param resourceType LocalResourceType
* @param link link presented in the distributed cache to the destination
* @param statCache cache to store the file/directory stats
* @param appMasterOnly Whether to only add the resource to the app master
*/
def addResource(
fs: FileSystem,
conf: Configuration,
destPath: Path,
localResources: HashMap[String, LocalResource],
resourceType: LocalResourceType,
link: String,
statCache: Map[URI, FileStatus],
appMasterOnly: Boolean = false) = {
val destStatus = fs.getFileStatus(destPath)
val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
amJarRsrc.setType(resourceType)
val visibility = getVisibility(conf, destPath.toUri(), statCache)
amJarRsrc.setVisibility(visibility)
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath))
amJarRsrc.setTimestamp(destStatus.getModificationTime())
amJarRsrc.setSize(destStatus.getLen())
if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name")
localResources(link) = amJarRsrc
if (appMasterOnly == false) {
val uri = destPath.toUri()
val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
if (resourceType == LocalResourceType.FILE) {
distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(),
destStatus.getModificationTime().toString(), visibility.name())
} else {
distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(),
destStatus.getModificationTime().toString(), visibility.name())
}
}
}
/**
* Adds the necessary cache file env variables to the env passed in
* @param env
*/
def setDistFilesEnv(env: Map[String, String]) = {
val (keys, tupleValues) = distCacheFiles.unzip
val (sizes, timeStamps, visibilities) = tupleValues.unzip3
if (keys.size > 0) {
env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") =
timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
env("SPARK_YARN_CACHE_FILES_FILE_SIZES") =
sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
env("SPARK_YARN_CACHE_FILES_VISIBILITIES") =
visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
}
}
/**
* Adds the necessary cache archive env variables to the env passed in
* @param env
*/
def setDistArchivesEnv(env: Map[String, String]) = {
val (keys, tupleValues) = distCacheArchives.unzip
val (sizes, timeStamps, visibilities) = tupleValues.unzip3
if (keys.size > 0) {
env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") =
timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") =
visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
}
}
/**
* Returns the local resource visibility depending on the cache file permissions
* @param conf
* @param uri
* @param statCache
* @return LocalResourceVisibility
*/
def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
LocalResourceVisibility = {
if (isPublic(conf, uri, statCache)) {
return LocalResourceVisibility.PUBLIC
}
return LocalResourceVisibility.PRIVATE
}
/**
* Returns a boolean to denote whether a cache file is visible to all(public)
* or not
* @param conf
* @param uri
* @param statCache
* @return true if the path in the uri is visible to all, false otherwise
*/
def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
val fs = FileSystem.get(uri, conf)
val current = new Path(uri.getPath())
//the leaf level file should be readable by others
if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
return false
}
return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
}
/**
* Returns true if all ancestors of the specified path have the 'execute'
* permission set for all users (i.e. that other users can traverse
* the directory heirarchy to the given path)
* @param fs
* @param path
* @param statCache
* @return true if all ancestors have the 'execute' permission set for all users
*/
def ancestorsHaveExecutePermissions(fs: FileSystem, path: Path,
statCache: Map[URI, FileStatus]): Boolean = {
var current = path
while (current != null) {
//the subdirs in the path should have execute permissions for others
if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
return false
}
current = current.getParent()
}
return true
}
/**
* Checks for a given path whether the Other permissions on it
* imply the permission in the passed FsAction
* @param fs
* @param path
* @param action
* @param statCache
* @return true if the path in the uri is visible to all, false otherwise
*/
def checkPermissionOfOther(fs: FileSystem, path: Path,
action: FsAction, statCache: Map[URI, FileStatus]): Boolean = {
val status = getFileStatus(fs, path.toUri(), statCache)
val perms = status.getPermission()
val otherAction = perms.getOtherAction()
if (otherAction.implies(action)) {
return true
}
return false
}
/**
* Checks to see if the given uri exists in the cache, if it does it
* returns the existing FileStatus, otherwise it stats the uri, stores
* it in the cache, and returns the FileStatus.
* @param fs
* @param uri
* @param statCache
* @return FileStatus
*/
def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = {
val stat = statCache.get(uri) match {
case Some(existstat) => existstat
case None =>
val newStat = fs.getFileStatus(new Path(uri))
statCache.put(uri, newStat)
newStat
}
return stat
}
}

View file

@ -1,43 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.conf.Configuration
/**
* Contains util methods to interact with Hadoop from spark.
*/
class YarnSparkHadoopUtil extends SparkHadoopUtil {
// Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
override def isYarnMode(): Boolean = { true }
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
// Always create a new config, dont reuse yarnConf.
override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
// add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
override def addCredentials(conf: JobConf) {
val jobCreds = conf.getCredentials()
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}
}

View file

@ -1,48 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
import org.apache.spark._
import org.apache.hadoop.conf.Configuration
import org.apache.spark.deploy.yarn.YarnAllocationHandler
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
/**
*
* This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM.
*/
private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
def this(sc: SparkContext) = this(sc, new Configuration())
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
val retval = YarnAllocationHandler.lookupRack(conf, host)
if (retval != null) Some(retval) else None
}
override def postStartHook() {
// The yarn application is running, but the worker might not yet ready
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
Thread.sleep(2000L)
logInfo("YarnClientClusterScheduler.postStartHook done")
}
}

View file

@ -1,110 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
import org.apache.spark.{SparkException, Logging, SparkContext}
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.scheduler.TaskSchedulerImpl
private[spark] class YarnClientSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with Logging {
var client: Client = null
var appId: ApplicationId = null
override def start() {
super.start()
val defalutWorkerCores = "2"
val defalutWorkerMemory = "512m"
val defaultWorkerNumber = "1"
val userJar = System.getenv("SPARK_YARN_APP_JAR")
var workerCores = System.getenv("SPARK_WORKER_CORES")
var workerMemory = System.getenv("SPARK_WORKER_MEMORY")
var workerNumber = System.getenv("SPARK_WORKER_INSTANCES")
if (userJar == null)
throw new SparkException("env SPARK_YARN_APP_JAR is not set")
if (workerCores == null)
workerCores = defalutWorkerCores
if (workerMemory == null)
workerMemory = defalutWorkerMemory
if (workerNumber == null)
workerNumber = defaultWorkerNumber
val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
val argsArray = Array[String](
"--class", "notused",
"--jar", userJar,
"--args", hostport,
"--worker-memory", workerMemory,
"--worker-cores", workerCores,
"--num-workers", workerNumber,
"--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
)
val args = new ClientArguments(argsArray, conf)
client = new Client(args, conf)
appId = client.runApp()
waitForApp()
}
def waitForApp() {
// TODO : need a better way to find out whether the workers are ready or not
// maybe by resource usage report?
while(true) {
val report = client.getApplicationReport(appId)
logInfo("Application report from ASM: \n" +
"\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
"\t appStartTime: " + report.getStartTime() + "\n" +
"\t yarnAppState: " + report.getYarnApplicationState() + "\n"
)
// Ready to go, or already gone.
val state = report.getYarnApplicationState()
if (state == YarnApplicationState.RUNNING) {
return
} else if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
throw new SparkException("Yarn application already ended," +
"might be killed or not able to launch application master.")
}
Thread.sleep(1000)
}
}
override def stop() {
super.stop()
client.stop()
logInfo("Stoped")
}
}

View file

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
import org.apache.hadoop.conf.Configuration
import org.apache.spark._
import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
/**
*
* This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
* ApplicationMaster, etc. is done
*/
private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
extends TaskSchedulerImpl(sc) {
logInfo("Created YarnClusterScheduler")
def this(sc: SparkContext) = this(sc, new Configuration())
// Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate
// Note that only the first creation of SparkContext influences (and ideally, there must be only one SparkContext, right ?)
// Subsequent creations are ignored - since nodes are already allocated by then.
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
val retval = YarnAllocationHandler.lookupRack(conf, host)
if (retval != null) Some(retval) else None
}
override def postStartHook() {
val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
if (sparkContextInitialized){
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
Thread.sleep(3000L)
}
logInfo("YarnClusterScheduler.postStartHook done")
}
}

View file

@ -1,220 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn
import java.net.URI
import org.scalatest.FunSuite
import org.scalatest.mock.MockitoSugar
import org.mockito.Mockito.when
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.yarn.api.records.LocalResource
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
import org.apache.hadoop.yarn.api.records.LocalResourceType
import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
import scala.collection.mutable.HashMap
import scala.collection.mutable.Map
class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar {
class MockClientDistributedCacheManager extends ClientDistributedCacheManager {
override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
LocalResourceVisibility = {
return LocalResourceVisibility.PRIVATE
}
}
test("test getFileStatus empty") {
val distMgr = new ClientDistributedCacheManager()
val fs = mock[FileSystem]
val uri = new URI("/tmp/testing")
when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
val stat = distMgr.getFileStatus(fs, uri, statCache)
assert(stat.getPath() === null)
}
test("test getFileStatus cached") {
val distMgr = new ClientDistributedCacheManager()
val fs = mock[FileSystem]
val uri = new URI("/tmp/testing")
val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner",
null, new Path("/tmp/testing"))
when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus)
val stat = distMgr.getFileStatus(fs, uri, statCache)
assert(stat.getPath().toString() === "/tmp/testing")
}
test("test addResource") {
val distMgr = new MockClientDistributedCacheManager()
val fs = mock[FileSystem]
val conf = new Configuration()
val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
val localResources = HashMap[String, LocalResource]()
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link",
statCache, false)
val resource = localResources("link")
assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
assert(resource.getTimestamp() === 0)
assert(resource.getSize() === 0)
assert(resource.getType() === LocalResourceType.FILE)
val env = new HashMap[String, String]()
distMgr.setDistFilesEnv(env)
assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link")
assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0")
assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0")
assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
distMgr.setDistArchivesEnv(env)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
//add another one and verify both there and order correct
val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
null, new Path("/tmp/testing2"))
val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")
when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus)
distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2",
statCache, false)
val resource2 = localResources("link2")
assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE)
assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === destPath2)
assert(resource2.getTimestamp() === 10)
assert(resource2.getSize() === 20)
assert(resource2.getType() === LocalResourceType.FILE)
val env2 = new HashMap[String, String]()
distMgr.setDistFilesEnv(env2)
val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
val files = env2("SPARK_YARN_CACHE_FILES").split(',')
val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',')
assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
assert(timestamps(0) === "0")
assert(sizes(0) === "0")
assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name())
assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2")
assert(timestamps(1) === "10")
assert(sizes(1) === "20")
assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name())
}
test("test addResource link null") {
val distMgr = new MockClientDistributedCacheManager()
val fs = mock[FileSystem]
val conf = new Configuration()
val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
val localResources = HashMap[String, LocalResource]()
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
intercept[Exception] {
distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null,
statCache, false)
}
assert(localResources.get("link") === None)
assert(localResources.size === 0)
}
test("test addResource appmaster only") {
val distMgr = new MockClientDistributedCacheManager()
val fs = mock[FileSystem]
val conf = new Configuration()
val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
val localResources = HashMap[String, LocalResource]()
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
null, new Path("/tmp/testing"))
when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
statCache, true)
val resource = localResources("link")
assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
assert(resource.getTimestamp() === 10)
assert(resource.getSize() === 20)
assert(resource.getType() === LocalResourceType.ARCHIVE)
val env = new HashMap[String, String]()
distMgr.setDistFilesEnv(env)
assert(env.get("SPARK_YARN_CACHE_FILES") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
distMgr.setDistArchivesEnv(env)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
}
test("test addResource archive") {
val distMgr = new MockClientDistributedCacheManager()
val fs = mock[FileSystem]
val conf = new Configuration()
val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
val localResources = HashMap[String, LocalResource]()
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
null, new Path("/tmp/testing"))
when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
statCache, false)
val resource = localResources("link")
assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
assert(resource.getTimestamp() === 10)
assert(resource.getSize() === 20)
assert(resource.getType() === LocalResourceType.ARCHIVE)
val env = new HashMap[String, String]()
distMgr.setDistArchivesEnv(env)
assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link")
assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10")
assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20")
assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
distMgr.setDistFilesEnv(env)
assert(env.get("SPARK_YARN_CACHE_FILES") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
}
}

32
yarn/stable/pom.xml Normal file
View file

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>yarn-parent_2.10</artifactId>
<version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project YARN Stable API</name>
</project>

View file

@ -59,17 +59,19 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
// This actor just working as a monitor to watch on Driver Actor.
class MonitorActor(driverUrl: String) extends Actor {
var driver: ActorSelection = null
var driver: ActorSelection = _
override def preStart() {
logInfo("Listen to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
// Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
driver ! "Hello"
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
override def receive = {
case x: DisassociatedEvent =>
logInfo("Driver terminated or disconnected! Shutting down.")
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
driverClosed = true
}
}

View file

@ -42,7 +42,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.util.{RackResolver, Records}
object AllocationType extends Enumeration ("HOST", "RACK", "ANY") {
object AllocationType extends Enumeration {
type AllocationType = Value
val HOST, RACK, ANY = Value
}