[SPARK-16967] move mesos to module
## What changes were proposed in this pull request? Move Mesos code into a mvn module ## How was this patch tested? unit tests manually submitting a client mode and cluster mode job spark/mesos integration test suite Author: Michael Gummelt <mgummelt@mesosphere.io> Closes #14637 from mgummelt/mesos-module.
This commit is contained in:
parent
c0949dc944
commit
8e5475be3c
|
@ -44,7 +44,7 @@ notifications:
|
|||
# 5. Run maven install before running lint-java.
|
||||
install:
|
||||
- export MAVEN_SKIP_RC=1
|
||||
- build/mvn -T 4 -q -DskipTests -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
|
||||
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
|
||||
|
||||
# 6. Run lint-java.
|
||||
script:
|
||||
|
|
|
@ -138,6 +138,16 @@
|
|||
</dependency>
|
||||
</dependencies>
|
||||
</profile>
|
||||
<profile>
|
||||
<id>mesos</id>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-mesos_${scala.binary.version}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</profile>
|
||||
<profile>
|
||||
<id>hive</id>
|
||||
<dependencies>
|
||||
|
|
|
@ -215,11 +215,6 @@
|
|||
<groupId>org.glassfish.jersey.containers</groupId>
|
||||
<artifactId>jersey-container-servlet-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.mesos</groupId>
|
||||
<artifactId>mesos</artifactId>
|
||||
<classifier>${mesos.classifier}</classifier>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-all</artifactId>
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence
|
|||
TextInputFormat}
|
||||
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
|
||||
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
|
||||
import org.apache.mesos.MesosNativeLibrary
|
||||
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.broadcast.Broadcast
|
||||
|
@ -56,7 +55,6 @@ import org.apache.spark.rdd._
|
|||
import org.apache.spark.rpc.RpcEndpointRef
|
||||
import org.apache.spark.scheduler._
|
||||
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
|
||||
import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend}
|
||||
import org.apache.spark.scheduler.local.LocalSchedulerBackend
|
||||
import org.apache.spark.storage._
|
||||
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
|
||||
|
@ -2512,18 +2510,6 @@ object SparkContext extends Logging {
|
|||
}
|
||||
(backend, scheduler)
|
||||
|
||||
case MESOS_REGEX(mesosUrl) =>
|
||||
MesosNativeLibrary.load()
|
||||
val scheduler = new TaskSchedulerImpl(sc)
|
||||
val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
|
||||
val backend = if (coarseGrained) {
|
||||
new MesosCoarseGrainedSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager)
|
||||
} else {
|
||||
new MesosFineGrainedSchedulerBackend(scheduler, sc, mesosUrl)
|
||||
}
|
||||
scheduler.initialize(backend)
|
||||
(backend, scheduler)
|
||||
|
||||
case masterUrl =>
|
||||
val cm = getClusterManager(masterUrl) match {
|
||||
case Some(clusterMgr) => clusterMgr
|
||||
|
@ -2545,7 +2531,7 @@ object SparkContext extends Logging {
|
|||
private def getClusterManager(url: String): Option[ExternalClusterManager] = {
|
||||
val loader = Utils.getContextOrSparkClassLoader
|
||||
val serviceLoaders =
|
||||
ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
|
||||
ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
|
||||
if (serviceLoaders.size > 1) {
|
||||
throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) registered " +
|
||||
s"for the url $url:")
|
||||
|
@ -2566,8 +2552,6 @@ private object SparkMasterRegex {
|
|||
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
|
||||
// Regular expression for connecting to Spark deploy clusters
|
||||
val SPARK_REGEX = """spark://(.*)""".r
|
||||
// Regular expression for connection to Mesos cluster by mesos:// or mesos://zk:// url
|
||||
val MESOS_REGEX = """mesos://(.*)""".r
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
package org.apache.spark
|
||||
|
||||
import org.apache.mesos.Protos.{TaskState => MesosTaskState}
|
||||
|
||||
private[spark] object TaskState extends Enumeration {
|
||||
|
||||
val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value
|
||||
|
@ -30,22 +28,4 @@ private[spark] object TaskState extends Enumeration {
|
|||
def isFailed(state: TaskState): Boolean = (LOST == state) || (FAILED == state)
|
||||
|
||||
def isFinished(state: TaskState): Boolean = FINISHED_STATES.contains(state)
|
||||
|
||||
def toMesos(state: TaskState): MesosTaskState = state match {
|
||||
case LAUNCHING => MesosTaskState.TASK_STARTING
|
||||
case RUNNING => MesosTaskState.TASK_RUNNING
|
||||
case FINISHED => MesosTaskState.TASK_FINISHED
|
||||
case FAILED => MesosTaskState.TASK_FAILED
|
||||
case KILLED => MesosTaskState.TASK_KILLED
|
||||
case LOST => MesosTaskState.TASK_LOST
|
||||
}
|
||||
|
||||
def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match {
|
||||
case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => LAUNCHING
|
||||
case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => RUNNING
|
||||
case MesosTaskState.TASK_FINISHED => FINISHED
|
||||
case MesosTaskState.TASK_FAILED => FAILED
|
||||
case MesosTaskState.TASK_KILLED => KILLED
|
||||
case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => LOST
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ import org.scalatest.PrivateMethodTester
|
|||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
|
||||
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
|
||||
import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend}
|
||||
import org.apache.spark.scheduler.local.LocalSchedulerBackend
|
||||
|
||||
|
||||
|
@ -130,31 +129,4 @@ class SparkContextSchedulerCreationSuite
|
|||
case _ => fail()
|
||||
}
|
||||
}
|
||||
|
||||
def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
|
||||
val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
|
||||
try {
|
||||
val sched = createTaskScheduler(master, "client", conf)
|
||||
assert(sched.backend.getClass === expectedClass)
|
||||
} catch {
|
||||
case e: UnsatisfiedLinkError =>
|
||||
assert(e.getMessage.contains("mesos"))
|
||||
logWarning("Mesos not available, could not test actual Mesos scheduler creation")
|
||||
case e: Throwable => fail(e)
|
||||
}
|
||||
}
|
||||
|
||||
test("mesos fine-grained") {
|
||||
testMesos("mesos://localhost:1234", classOf[MesosFineGrainedSchedulerBackend], coarse = false)
|
||||
}
|
||||
|
||||
test("mesos coarse-grained") {
|
||||
testMesos("mesos://localhost:1234", classOf[MesosCoarseGrainedSchedulerBackend], coarse = true)
|
||||
}
|
||||
|
||||
test("mesos with zookeeper") {
|
||||
testMesos("mesos://zk://localhost:1234,localhost:2345",
|
||||
classOf[MesosFineGrainedSchedulerBackend], coarse = false)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ NEXUS_PROFILE=d63f592e7eac0 # Profile for Spark staging uploads
|
|||
BASE_DIR=$(pwd)
|
||||
|
||||
MVN="build/mvn --force"
|
||||
PUBLISH_PROFILES="-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2"
|
||||
PUBLISH_PROFILES="-Pmesos -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2"
|
||||
PUBLISH_PROFILES="$PUBLISH_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
|
||||
|
||||
rm -rf spark
|
||||
|
@ -186,12 +186,13 @@ if [[ "$1" == "package" ]]; then
|
|||
|
||||
# We increment the Zinc port each time to avoid OOM's and other craziness if multiple builds
|
||||
# share the same Zinc server.
|
||||
make_binary_release "hadoop2.3" "-Psparkr -Phadoop-2.3 -Phive -Phive-thriftserver -Pyarn" "3033" &
|
||||
make_binary_release "hadoop2.4" "-Psparkr -Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn" "3034" &
|
||||
make_binary_release "hadoop2.6" "-Psparkr -Phadoop-2.6 -Phive -Phive-thriftserver -Pyarn" "3035" &
|
||||
make_binary_release "hadoop2.7" "-Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pyarn" "3036" &
|
||||
make_binary_release "hadoop2.4-without-hive" "-Psparkr -Phadoop-2.4 -Pyarn" "3037" &
|
||||
make_binary_release "without-hadoop" "-Psparkr -Phadoop-provided -Pyarn" "3038" &
|
||||
FLAGS="-Psparkr -Phive -Phive-thriftserver -Pyarn -Pmesos"
|
||||
make_binary_release "hadoop2.3" "-Phadoop2.3 $FLAGS" "3033" &
|
||||
make_binary_release "hadoop2.4" "-Phadoop2.4 $FLAGS" "3034" &
|
||||
make_binary_release "hadoop2.6" "-Phadoop2.6 $FLAGS" "3035" &
|
||||
make_binary_release "hadoop2.7" "-Phadoop2.7 $FLAGS" "3036" &
|
||||
make_binary_release "hadoop2.4-without-hive" "-Psparkr -Phadoop-2.4 -Pyarn -Pmesos" "3037" &
|
||||
make_binary_release "without-hadoop" "-Psparkr -Phadoop-provided -Pyarn -Pmesos" "3038" &
|
||||
wait
|
||||
rm -rf spark-$SPARK_VERSION-bin-*/
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
|
||||
SPARK_ROOT_DIR="$(dirname $SCRIPT_DIR)"
|
||||
|
||||
ERRORS=$($SCRIPT_DIR/../build/mvn -Pkinesis-asl -Pyarn -Phive -Phive-thriftserver checkstyle:check | grep ERROR)
|
||||
ERRORS=$($SCRIPT_DIR/../build/mvn -Pkinesis-asl -Pmesos -Pyarn -Phive -Phive-thriftserver checkstyle:check | grep ERROR)
|
||||
|
||||
if test ! -z "$ERRORS"; then
|
||||
echo -e "Checkstyle checks failed at following occurrences:\n$ERRORS"
|
||||
|
|
2
dev/mima
2
dev/mima
|
@ -24,7 +24,7 @@ set -e
|
|||
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
|
||||
cd "$FWDIR"
|
||||
|
||||
SPARK_PROFILES="-Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
|
||||
SPARK_PROFILES="-Pmesos -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
|
||||
TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | tail -n1)"
|
||||
OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)"
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
ERRORS=$(echo -e "q\n" \
|
||||
| build/sbt \
|
||||
-Pkinesis-asl \
|
||||
-Pmesos \
|
||||
-Pyarn \
|
||||
-Phive \
|
||||
-Phive-thriftserver \
|
||||
|
|
|
@ -458,6 +458,13 @@ yarn = Module(
|
|||
]
|
||||
)
|
||||
|
||||
mesos = Module(
|
||||
name="mesos",
|
||||
dependencies=[],
|
||||
source_file_regexes=["mesos/"],
|
||||
sbt_test_goals=["mesos/test"]
|
||||
)
|
||||
|
||||
# The root module is a dummy module which is used to run all of the tests.
|
||||
# No other modules should directly depend on this module.
|
||||
root = Module(
|
||||
|
|
|
@ -29,7 +29,7 @@ export LC_ALL=C
|
|||
# TODO: This would be much nicer to do in SBT, once SBT supports Maven-style resolution.
|
||||
|
||||
# NOTE: These should match those in the release publishing script
|
||||
HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pyarn -Phive"
|
||||
HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pyarn -Phive"
|
||||
MVN="build/mvn"
|
||||
HADOOP_PROFILES=(
|
||||
hadoop-2.2
|
||||
|
|
|
@ -50,7 +50,7 @@ To create a Spark distribution like those distributed by the
|
|||
to be runnable, use `./dev/make-distribution.sh` in the project root directory. It can be configured
|
||||
with Maven profile settings and so on like the direct Maven build. Example:
|
||||
|
||||
./dev/make-distribution.sh --name custom-spark --tgz -Psparkr -Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn
|
||||
./dev/make-distribution.sh --name custom-spark --tgz -Psparkr -Phadoop-2.4 -Phive -Phive-thriftserver -Pmesos -Pyarn
|
||||
|
||||
For more information on usage, run `./dev/make-distribution.sh --help`
|
||||
|
||||
|
@ -105,13 +105,17 @@ By default Spark will build with Hive 1.2.1 bindings.
|
|||
|
||||
## Packaging without Hadoop Dependencies for YARN
|
||||
|
||||
The assembly directory produced by `mvn package` will, by default, include all of Spark's
|
||||
dependencies, including Hadoop and some of its ecosystem projects. On YARN deployments, this
|
||||
causes multiple versions of these to appear on executor classpaths: the version packaged in
|
||||
The assembly directory produced by `mvn package` will, by default, include all of Spark's
|
||||
dependencies, including Hadoop and some of its ecosystem projects. On YARN deployments, this
|
||||
causes multiple versions of these to appear on executor classpaths: the version packaged in
|
||||
the Spark assembly and the version on each node, included with `yarn.application.classpath`.
|
||||
The `hadoop-provided` profile builds the assembly without including Hadoop-ecosystem projects,
|
||||
The `hadoop-provided` profile builds the assembly without including Hadoop-ecosystem projects,
|
||||
like ZooKeeper and Hadoop itself.
|
||||
|
||||
## Building with Mesos support
|
||||
|
||||
./build/mvn -Pmesos -DskipTests clean package
|
||||
|
||||
## Building for Scala 2.10
|
||||
To produce a Spark package compiled with Scala 2.10, use the `-Dscala-2.10` property:
|
||||
|
||||
|
@ -263,17 +267,17 @@ The run-tests script also can be limited to a specific Python version or a speci
|
|||
|
||||
## Running R Tests
|
||||
|
||||
To run the SparkR tests you will need to install the R package `testthat`
|
||||
(run `install.packages(testthat)` from R shell). You can run just the SparkR tests using
|
||||
To run the SparkR tests you will need to install the R package `testthat`
|
||||
(run `install.packages(testthat)` from R shell). You can run just the SparkR tests using
|
||||
the command:
|
||||
|
||||
./R/run-tests.sh
|
||||
|
||||
## Running Docker-based Integration Test Suites
|
||||
|
||||
In order to run Docker integration tests, you have to install the `docker` engine on your box.
|
||||
The instructions for installation can be found at [the Docker site](https://docs.docker.com/engine/installation/).
|
||||
Once installed, the `docker` service needs to be started, if not already running.
|
||||
In order to run Docker integration tests, you have to install the `docker` engine on your box.
|
||||
The instructions for installation can be found at [the Docker site](https://docs.docker.com/engine/installation/).
|
||||
Once installed, the `docker` service needs to be started, if not already running.
|
||||
On Linux, this can be done by `sudo service docker start`.
|
||||
|
||||
./build/mvn install -DskipTests
|
||||
|
|
109
mesos/pom.xml
Normal file
109
mesos/pom.xml
Normal file
|
@ -0,0 +1,109 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
~ contributor license agreements. See the NOTICE file distributed with
|
||||
~ this work for additional information regarding copyright ownership.
|
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
~ (the "License"); you may not use this file except in compliance with
|
||||
~ the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-parent_2.11</artifactId>
|
||||
<version>2.1.0-SNAPSHOT</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<artifactId>spark-mesos_2.11</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>Spark Project Mesos</name>
|
||||
<properties>
|
||||
<sbt.project.name>mesos</sbt.project.name>
|
||||
<mesos.version>1.0.0</mesos.version>
|
||||
<mesos.classifier>shaded-protobuf</mesos.classifier>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_${scala.binary.version}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_${scala.binary.version}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.mesos</groupId>
|
||||
<artifactId>mesos</artifactId>
|
||||
<version>${mesos.version}</version>
|
||||
<classifier>${mesos.classifier}</classifier>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.google.protobuf</groupId>
|
||||
<artifactId>protobuf-java</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive -->
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-server</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-plus</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-util</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-http</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-servlet</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-servlets</artifactId>
|
||||
</dependency>
|
||||
<!-- End of shaded deps. -->
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
<build>
|
||||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
|
||||
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
|
||||
</build>
|
||||
|
||||
</project>
|
|
@ -0,0 +1 @@
|
|||
org.apache.spark.scheduler.cluster.mesos.MesosClusterManager
|
|
@ -26,25 +26,26 @@ import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
|
|||
import org.apache.mesos.protobuf.ByteString
|
||||
|
||||
import org.apache.spark.{SparkConf, SparkEnv, TaskState}
|
||||
import org.apache.spark.TaskState.TaskState
|
||||
import org.apache.spark.TaskState
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData
|
||||
import org.apache.spark.scheduler.cluster.mesos.{MesosSchedulerUtils, MesosTaskLaunchData}
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
private[spark] class MesosExecutorBackend
|
||||
extends MesosExecutor
|
||||
with MesosSchedulerUtils // TODO: fix
|
||||
with ExecutorBackend
|
||||
with Logging {
|
||||
|
||||
var executor: Executor = null
|
||||
var driver: ExecutorDriver = null
|
||||
|
||||
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
|
||||
override def statusUpdate(taskId: Long, state: TaskState.TaskState, data: ByteBuffer) {
|
||||
val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
|
||||
driver.sendStatusUpdate(MesosTaskStatus.newBuilder()
|
||||
.setTaskId(mesosTaskId)
|
||||
.setState(TaskState.toMesos(state))
|
||||
.setState(taskStateToMesos(state))
|
||||
.setData(ByteString.copyFrom(data))
|
||||
.build())
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.scheduler.cluster.mesos
|
||||
|
||||
import org.apache.spark.{SparkContext, SparkException}
|
||||
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
|
||||
|
||||
/**
|
||||
* Cluster Manager for creation of Yarn scheduler and backend
|
||||
*/
|
||||
private[spark] class MesosClusterManager extends ExternalClusterManager {
|
||||
private val MESOS_REGEX = """mesos://(.*)""".r
|
||||
|
||||
override def canCreate(masterURL: String): Boolean = {
|
||||
masterURL.startsWith("mesos")
|
||||
}
|
||||
|
||||
override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
|
||||
new TaskSchedulerImpl(sc)
|
||||
}
|
||||
|
||||
override def createSchedulerBackend(sc: SparkContext,
|
||||
masterURL: String,
|
||||
scheduler: TaskScheduler): SchedulerBackend = {
|
||||
val mesosUrl = MESOS_REGEX.findFirstMatchIn(masterURL).get.group(1)
|
||||
val coarse = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
|
||||
if (coarse) {
|
||||
new MesosCoarseGrainedSchedulerBackend(
|
||||
scheduler.asInstanceOf[TaskSchedulerImpl],
|
||||
sc,
|
||||
mesosUrl,
|
||||
sc.env.securityManager)
|
||||
} else {
|
||||
new MesosFineGrainedSchedulerBackend(
|
||||
scheduler.asInstanceOf[TaskSchedulerImpl],
|
||||
sc,
|
||||
mesosUrl)
|
||||
}
|
||||
}
|
||||
|
||||
override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
|
||||
scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
|
||||
}
|
||||
}
|
||||
|
|
@ -680,7 +680,7 @@ private[spark] class MesosClusterScheduler(
|
|||
retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)))
|
||||
pendingRetryDrivers += newDriverDescription
|
||||
pendingRetryDriversState.persist(taskId, newDriverDescription)
|
||||
} else if (TaskState.isFinished(TaskState.fromMesos(status.getState))) {
|
||||
} else if (TaskState.isFinished(mesosToTaskState(status.getState))) {
|
||||
removeFromLaunchedDrivers(taskId)
|
||||
state.finishDate = Some(new Date())
|
||||
if (finishedDrivers.size >= retainedDrivers) {
|
|
@ -473,7 +473,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
|
|||
override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) {
|
||||
val taskId = status.getTaskId.getValue
|
||||
val slaveId = status.getSlaveId.getValue
|
||||
val state = TaskState.fromMesos(status.getState)
|
||||
val state = mesosToTaskState(status.getState)
|
||||
|
||||
logInfo(s"Mesos task $taskId is now ${status.getState}")
|
||||
|
|
@ -366,9 +366,9 @@ private[spark] class MesosFineGrainedSchedulerBackend(
|
|||
override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) {
|
||||
inClassLoader() {
|
||||
val tid = status.getTaskId.getValue.toLong
|
||||
val state = TaskState.fromMesos(status.getState)
|
||||
val state = mesosToTaskState(status.getState)
|
||||
synchronized {
|
||||
if (TaskState.isFailed(TaskState.fromMesos(status.getState))
|
||||
if (TaskState.isFailed(mesosToTaskState(status.getState))
|
||||
&& taskIdToSlaveId.contains(tid)) {
|
||||
// We lost the executor on this slave, so remember that it's gone
|
||||
removeExecutor(taskIdToSlaveId(tid), "Lost executor")
|
|
@ -26,19 +26,21 @@ import scala.util.control.NonFatal
|
|||
|
||||
import com.google.common.base.Splitter
|
||||
import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver}
|
||||
import org.apache.mesos.Protos._
|
||||
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
|
||||
import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
|
||||
|
||||
import org.apache.spark.{SparkConf, SparkContext, SparkException}
|
||||
import org.apache.spark.TaskState
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Shared trait for implementing a Mesos Scheduler. This holds common state and helper
|
||||
* methods and Mesos scheduler will use.
|
||||
*/
|
||||
private[mesos] trait MesosSchedulerUtils extends Logging {
|
||||
trait MesosSchedulerUtils extends Logging {
|
||||
// Lock used to wait for scheduler to be registered
|
||||
private final val registerLatch = new CountDownLatch(1)
|
||||
|
||||
|
@ -491,4 +493,22 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
|
|||
sc.conf.remove("spark.mesos.driver.frameworkId")
|
||||
System.clearProperty("spark.mesos.driver.frameworkId")
|
||||
}
|
||||
|
||||
def mesosToTaskState(state: MesosTaskState): TaskState.TaskState = state match {
|
||||
case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => TaskState.LAUNCHING
|
||||
case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => TaskState.RUNNING
|
||||
case MesosTaskState.TASK_FINISHED => TaskState.FINISHED
|
||||
case MesosTaskState.TASK_FAILED => TaskState.FAILED
|
||||
case MesosTaskState.TASK_KILLED => TaskState.KILLED
|
||||
case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => TaskState.LOST
|
||||
}
|
||||
|
||||
def taskStateToMesos(state: TaskState.TaskState): MesosTaskState = state match {
|
||||
case TaskState.LAUNCHING => MesosTaskState.TASK_STARTING
|
||||
case TaskState.RUNNING => MesosTaskState.TASK_RUNNING
|
||||
case TaskState.FINISHED => MesosTaskState.TASK_FINISHED
|
||||
case TaskState.FAILED => MesosTaskState.TASK_FAILED
|
||||
case TaskState.KILLED => MesosTaskState.TASK_KILLED
|
||||
case TaskState.LOST => MesosTaskState.TASK_LOST
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.scheduler.cluster.mesos
|
||||
|
||||
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
|
||||
|
||||
class MesosClusterManagerSuite extends SparkFunSuite with LocalSparkContext {
|
||||
def testURL(masterURL: String, expectedClass: Class[_], coarse: Boolean) {
|
||||
val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
|
||||
sc = new SparkContext("local", "test", conf)
|
||||
val clusterManager = new MesosClusterManager()
|
||||
|
||||
assert(clusterManager.canCreate(masterURL))
|
||||
val taskScheduler = clusterManager.createTaskScheduler(sc, masterURL)
|
||||
val sched = clusterManager.createSchedulerBackend(sc, masterURL, taskScheduler)
|
||||
assert(sched.getClass === expectedClass)
|
||||
}
|
||||
|
||||
test("mesos fine-grained") {
|
||||
testURL("mesos://localhost:1234", classOf[MesosFineGrainedSchedulerBackend], coarse = false)
|
||||
}
|
||||
|
||||
test("mesos coarse-grained") {
|
||||
testURL("mesos://localhost:1234", classOf[MesosCoarseGrainedSchedulerBackend], coarse = true)
|
||||
}
|
||||
|
||||
test("mesos with zookeeper") {
|
||||
testURL("mesos://zk://localhost:1234,localhost:2345",
|
||||
classOf[MesosFineGrainedSchedulerBackend],
|
||||
coarse = false)
|
||||
}
|
||||
}
|
21
pom.xml
21
pom.xml
|
@ -119,8 +119,6 @@
|
|||
<java.version>1.7</java.version>
|
||||
<maven.version>3.3.9</maven.version>
|
||||
<sbt.project.name>spark</sbt.project.name>
|
||||
<mesos.version>1.0.0</mesos.version>
|
||||
<mesos.classifier>shaded-protobuf</mesos.classifier>
|
||||
<slf4j.version>1.7.16</slf4j.version>
|
||||
<log4j.version>1.2.17</log4j.version>
|
||||
<hadoop.version>2.2.0</hadoop.version>
|
||||
|
@ -527,18 +525,6 @@
|
|||
<version>${protobuf.version}</version>
|
||||
<scope>${hadoop.deps.scope}</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.mesos</groupId>
|
||||
<artifactId>mesos</artifactId>
|
||||
<version>${mesos.version}</version>
|
||||
<classifier>${mesos.classifier}</classifier>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.google.protobuf</groupId>
|
||||
<artifactId>protobuf-java</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.roaringbitmap</groupId>
|
||||
<artifactId>RoaringBitmap</artifactId>
|
||||
|
@ -2527,6 +2513,13 @@
|
|||
</modules>
|
||||
</profile>
|
||||
|
||||
<profile>
|
||||
<id>mesos</id>
|
||||
<modules>
|
||||
<module>mesos</module>
|
||||
</modules>
|
||||
</profile>
|
||||
|
||||
<profile>
|
||||
<id>hive-thriftserver</id>
|
||||
<modules>
|
||||
|
|
|
@ -40,7 +40,9 @@ object MimaExcludes {
|
|||
// [SPARK-16199][SQL] Add a method to list the referenced columns in data source Filter
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.Filter.references"),
|
||||
// [SPARK-16853][SQL] Fixes encoder error in DataSet typed select
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.Dataset.select")
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.Dataset.select"),
|
||||
// [SPARK-16967] Move Mesos to Module
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX")
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -56,9 +56,9 @@ object BuildCommons {
|
|||
"tags", "sketch"
|
||||
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects
|
||||
|
||||
val optionallyEnabledProjects@Seq(yarn, java8Tests, sparkGangliaLgpl,
|
||||
val optionallyEnabledProjects@Seq(mesos, yarn, java8Tests, sparkGangliaLgpl,
|
||||
streamingKinesisAsl, dockerIntegrationTests) =
|
||||
Seq("yarn", "java8-tests", "ganglia-lgpl", "streaming-kinesis-asl",
|
||||
Seq("mesos", "yarn", "java8-tests", "ganglia-lgpl", "streaming-kinesis-asl",
|
||||
"docker-integration-tests").map(ProjectRef(buildLocation, _))
|
||||
|
||||
val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) =
|
||||
|
|
Loading…
Reference in a new issue