Merge pull request #34 from kayousterhout/rename
Renamed StandaloneX to CoarseGrainedX. (as suggested by @rxin here https://github.com/apache/incubator-spark/pull/14) The previous names were confusing because the components weren't just used in Standalone mode. The scheduler used for Standalone mode is called SparkDeploySchedulerBackend, so referring to the base class as StandaloneSchedulerBackend was misleading.
This commit is contained in:
commit
6dbd2208ff
|
@ -56,7 +56,7 @@ import org.apache.spark.deploy.LocalSparkCluster
|
|||
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
|
||||
import org.apache.spark.rdd._
|
||||
import org.apache.spark.scheduler._
|
||||
import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
|
||||
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend,
|
||||
ClusterScheduler}
|
||||
import org.apache.spark.scheduler.local.LocalScheduler
|
||||
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
|
||||
|
@ -213,7 +213,7 @@ class SparkContext(
|
|||
throw new SparkException("YARN mode not available ?", th)
|
||||
}
|
||||
}
|
||||
val backend = new StandaloneSchedulerBackend(scheduler, this.env.actorSystem)
|
||||
val backend = new CoarseGrainedSchedulerBackend(scheduler, this.env.actorSystem)
|
||||
scheduler.initialize(backend)
|
||||
scheduler
|
||||
|
||||
|
|
|
@ -24,11 +24,11 @@ import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClie
|
|||
|
||||
import org.apache.spark.{Logging, SparkEnv}
|
||||
import org.apache.spark.TaskState.TaskState
|
||||
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
|
||||
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
|
||||
import org.apache.spark.util.{Utils, AkkaUtils}
|
||||
|
||||
|
||||
private[spark] class StandaloneExecutorBackend(
|
||||
private[spark] class CoarseGrainedExecutorBackend(
|
||||
driverUrl: String,
|
||||
executorId: String,
|
||||
hostPort: String,
|
||||
|
@ -87,7 +87,7 @@ private[spark] class StandaloneExecutorBackend(
|
|||
}
|
||||
}
|
||||
|
||||
private[spark] object StandaloneExecutorBackend {
|
||||
private[spark] object CoarseGrainedExecutorBackend {
|
||||
def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
|
||||
// Debug code
|
||||
Utils.checkHost(hostname)
|
||||
|
@ -99,7 +99,7 @@ private[spark] object StandaloneExecutorBackend {
|
|||
val sparkHostPort = hostname + ":" + boundPort
|
||||
System.setProperty("spark.hostPort", sparkHostPort)
|
||||
val actor = actorSystem.actorOf(
|
||||
Props(new StandaloneExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
|
||||
Props(new CoarseGrainedExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
|
||||
name = "Executor")
|
||||
actorSystem.awaitTermination()
|
||||
}
|
||||
|
@ -107,7 +107,9 @@ private[spark] object StandaloneExecutorBackend {
|
|||
def main(args: Array[String]) {
|
||||
if (args.length < 4) {
|
||||
//the reason we allow the last frameworkId argument is to make it easy to kill rogue executors
|
||||
System.err.println("Usage: StandaloneExecutorBackend <driverUrl> <executorId> <hostname> <cores> [<appid>]")
|
||||
System.err.println(
|
||||
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> <cores> " +
|
||||
"[<appid>]")
|
||||
System.exit(1)
|
||||
}
|
||||
run(args(0), args(1), args(2), args(3).toInt)
|
|
@ -24,28 +24,28 @@ import org.apache.spark.scheduler.TaskDescription
|
|||
import org.apache.spark.util.{Utils, SerializableBuffer}
|
||||
|
||||
|
||||
private[spark] sealed trait StandaloneClusterMessage extends Serializable
|
||||
private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
|
||||
|
||||
private[spark] object StandaloneClusterMessages {
|
||||
private[spark] object CoarseGrainedClusterMessages {
|
||||
|
||||
// Driver to executors
|
||||
case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
|
||||
case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
|
||||
|
||||
case class KillTask(taskId: Long, executor: String) extends StandaloneClusterMessage
|
||||
case class KillTask(taskId: Long, executor: String) extends CoarseGrainedClusterMessage
|
||||
|
||||
case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
|
||||
extends StandaloneClusterMessage
|
||||
extends CoarseGrainedClusterMessage
|
||||
|
||||
case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
|
||||
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
|
||||
|
||||
// Executors to driver
|
||||
case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
|
||||
extends StandaloneClusterMessage {
|
||||
extends CoarseGrainedClusterMessage {
|
||||
Utils.checkHostPort(hostPort, "Expected host port")
|
||||
}
|
||||
|
||||
case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
|
||||
data: SerializableBuffer) extends StandaloneClusterMessage
|
||||
data: SerializableBuffer) extends CoarseGrainedClusterMessage
|
||||
|
||||
object StatusUpdate {
|
||||
/** Alternate factory method that takes a ByteBuffer directly for the data field */
|
||||
|
@ -56,10 +56,10 @@ private[spark] object StandaloneClusterMessages {
|
|||
}
|
||||
|
||||
// Internal messages in driver
|
||||
case object ReviveOffers extends StandaloneClusterMessage
|
||||
case object ReviveOffers extends CoarseGrainedClusterMessage
|
||||
|
||||
case object StopDriver extends StandaloneClusterMessage
|
||||
case object StopDriver extends CoarseGrainedClusterMessage
|
||||
|
||||
case class RemoveExecutor(executorId: String, reason: String) extends StandaloneClusterMessage
|
||||
case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
|
||||
|
||||
}
|
|
@ -30,16 +30,19 @@ import akka.util.duration._
|
|||
|
||||
import org.apache.spark.{SparkException, Logging, TaskState}
|
||||
import org.apache.spark.scheduler.TaskDescription
|
||||
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
|
||||
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
/**
|
||||
* A standalone scheduler backend, which waits for standalone executors to connect to it through
|
||||
* Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained
|
||||
* Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
|
||||
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
|
||||
* This backend holds onto each executor for the duration of the Spark job rather than relinquishing
|
||||
* executors whenever a task is done and asking the scheduler to launch a new executor for
|
||||
* each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the
|
||||
* coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode
|
||||
* (spark.deploy.*).
|
||||
*/
|
||||
private[spark]
|
||||
class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
|
||||
class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
|
||||
extends SchedulerBackend with Logging
|
||||
{
|
||||
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
|
||||
|
@ -162,7 +165,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
|
|||
}
|
||||
}
|
||||
driverActor = actorSystem.actorOf(
|
||||
Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)
|
||||
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
|
||||
}
|
||||
|
||||
private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
|
||||
|
@ -202,6 +205,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
|
|||
}
|
||||
}
|
||||
|
||||
private[spark] object StandaloneSchedulerBackend {
|
||||
val ACTOR_NAME = "StandaloneScheduler"
|
||||
private[spark] object CoarseGrainedSchedulerBackend {
|
||||
val ACTOR_NAME = "CoarseGrainedScheduler"
|
||||
}
|
|
@ -28,7 +28,7 @@ private[spark] class SparkDeploySchedulerBackend(
|
|||
sc: SparkContext,
|
||||
masters: Array[String],
|
||||
appName: String)
|
||||
extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
|
||||
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
|
||||
with ClientListener
|
||||
with Logging {
|
||||
|
||||
|
@ -44,10 +44,10 @@ private[spark] class SparkDeploySchedulerBackend(
|
|||
// The endpoint for executors to talk to us
|
||||
val driverUrl = "akka://spark@%s:%s/user/%s".format(
|
||||
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
|
||||
StandaloneSchedulerBackend.ACTOR_NAME)
|
||||
CoarseGrainedSchedulerBackend.ACTOR_NAME)
|
||||
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
|
||||
val command = Command(
|
||||
"org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
|
||||
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
|
||||
val sparkHome = sc.getSparkHome().getOrElse(null)
|
||||
val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
|
||||
"http://" + sc.ui.appUIAddress)
|
||||
|
|
|
@ -30,13 +30,14 @@ import org.apache.mesos._
|
|||
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
|
||||
|
||||
import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
|
||||
import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}
|
||||
import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend}
|
||||
|
||||
/**
|
||||
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
|
||||
* onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
|
||||
* a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
|
||||
* StandaloneBackend mechanism. This class is useful for lower and more predictable latency.
|
||||
* CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable
|
||||
* latency.
|
||||
*
|
||||
* Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
|
||||
* remove this.
|
||||
|
@ -46,7 +47,7 @@ private[spark] class CoarseMesosSchedulerBackend(
|
|||
sc: SparkContext,
|
||||
master: String,
|
||||
appName: String)
|
||||
extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
|
||||
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
|
||||
with MScheduler
|
||||
with Logging {
|
||||
|
||||
|
@ -122,20 +123,20 @@ private[spark] class CoarseMesosSchedulerBackend(
|
|||
val driverUrl = "akka://spark@%s:%s/user/%s".format(
|
||||
System.getProperty("spark.driver.host"),
|
||||
System.getProperty("spark.driver.port"),
|
||||
StandaloneSchedulerBackend.ACTOR_NAME)
|
||||
CoarseGrainedSchedulerBackend.ACTOR_NAME)
|
||||
val uri = System.getProperty("spark.executor.uri")
|
||||
if (uri == null) {
|
||||
val runScript = new File(sparkHome, "spark-class").getCanonicalPath
|
||||
command.setValue(
|
||||
"\"%s\" org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
|
||||
"\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
|
||||
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
|
||||
} else {
|
||||
// Grab everything to the first '.'. We'll use that and '*' to
|
||||
// glob the directory "correctly".
|
||||
val basename = uri.split('/').last.split('.').head
|
||||
command.setValue(
|
||||
"cd %s*; ./spark-class org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
|
||||
basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
|
||||
"cd %s*; ./spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
|
||||
.format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
|
||||
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
|
||||
}
|
||||
return command.build()
|
||||
|
|
Loading…
Reference in a new issue