[SPARK-13904][SCHEDULER] Add support for pluggable cluster manager

## What changes were proposed in this pull request?

This commit adds support for pluggable cluster manager. And also allows a cluster manager to clean up tasks without taking the parent process down.

To plug a new external cluster manager, ExternalClusterManager trait should be implemented. It returns task scheduler and backend scheduler that will be used by SparkContext to schedule tasks. An external cluster manager is registered using the java.util.ServiceLoader mechanism (This mechanism is also being used to register data sources like parquet, json, jdbc etc.). This allows auto-loading implementations of ExternalClusterManager interface.

Currently, when a driver fails, executors exit using system.exit. This does not bode well for cluster managers that would like to reuse the parent process of an executor. Hence,

  1. Moving system.exit to a function that can be overriden in subclasses of CoarseGrainedExecutorBackend.
  2. Added functionality of killing all the running tasks in an executor.

## How was this patch tested?
ExternalClusterManagerSuite.scala was added to test this patch.

Author: Hemant Bhanawat <hemant@snappydata.io>

Closes #11723 from hbhanawat/pluggableScheduler.
This commit is contained in:
Hemant Bhanawat 2016-04-16 23:43:32 -07:00 committed by Reynold Xin
parent 3394b12c37
commit af1f4da762
7 changed files with 193 additions and 8 deletions

View file

@ -20,7 +20,7 @@ package org.apache.spark
import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Properties, UUID}
import java.util.{Arrays, Properties, ServiceLoader, UUID}
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
@ -2453,10 +2453,33 @@ object SparkContext extends Logging {
"in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
createTaskScheduler(sc, "mesos://" + zkUrl, deployMode)
case _ =>
throw new SparkException("Could not parse Master URL: '" + master + "'")
case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
}
private def getClusterManager(url: String): Option[ExternalClusterManager] = {
val loader = Utils.getContextOrSparkClassLoader
val serviceLoaders =
ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
if (serviceLoaders.size > 1) {
throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) registered " +
s"for the url $url:")
}
serviceLoaders.headOption
}
}
/**

View file

@ -64,7 +64,7 @@ private[spark] class CoarseGrainedExecutorBackend(
// Always receive `true`. Just ignore it
case Failure(e) =>
logError(s"Cannot register with driver: $driverUrl", e)
System.exit(1)
exitExecutor()
}(ThreadUtils.sameThread)
}
@ -81,12 +81,12 @@ private[spark] class CoarseGrainedExecutorBackend(
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
System.exit(1)
exitExecutor()
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
exitExecutor()
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
@ -97,7 +97,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
logError("Received KillTask command but executor was null")
System.exit(1)
exitExecutor()
} else {
executor.killTask(taskId, interruptThread)
}
@ -127,7 +127,7 @@ private[spark] class CoarseGrainedExecutorBackend(
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
} else if (driver.exists(_.address == remoteAddress)) {
logError(s"Driver $remoteAddress disassociated! Shutting down.")
System.exit(1)
exitExecutor()
} else {
logWarning(s"An unknown ($remoteAddress) driver disconnected.")
}
@ -140,6 +140,13 @@ private[spark] class CoarseGrainedExecutorBackend(
case None => logWarning(s"Drop $msg because has not yet connected to driver")
}
}
/**
* This function can be overloaded by other child classes to handle
* executor exits differently. For e.g. when an executor goes down,
* back-end may not want to take the parent process down.
*/
protected def exitExecutor(): Unit = System.exit(1)
}
private[spark] object CoarseGrainedExecutorBackend extends Logging {

View file

@ -153,6 +153,21 @@ private[spark] class Executor(
}
}
/**
* Function to kill the running tasks in an executor.
* This can be called by executor back-ends to kill the
* tasks instead of taking the JVM down.
* @param interruptThread whether to interrupt the task thread
*/
def killAllTasks(interruptThread: Boolean) : Unit = {
// kill all the running tasks
for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner != null) {
taskRunner.kill(interruptThread)
}
}
}
def stop(): Unit = {
env.metricsSystem.report()
heartbeater.shutdown()

View file

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler
import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
* A cluster manager interface to plugin external scheduler.
*/
@DeveloperApi
trait ExternalClusterManager {
/**
* Check if this cluster manager instance can create scheduler components
* for a certain master URL.
* @param masterURL the master URL
* @return True if the cluster manager can create scheduler backend/
*/
def canCreate(masterURL: String): Boolean
/**
* Create a task scheduler instance for the given SparkContext
* @param sc SparkContext
* @param masterURL the master URL
* @return TaskScheduler that will be responsible for task handling
*/
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
/**
* Create a scheduler backend for the given SparkContext and scheduler. This is
* called after task scheduler is created using [[ExternalClusterManager.createTaskScheduler()]].
* @param sc SparkContext
* @param masterURL the master URL
* @param scheduler TaskScheduler that will be used with the scheduler backend.
* @return SchedulerBackend that works with a TaskScheduler
*/
def createSchedulerBackend(sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend
/**
* Initialize task scheduler and backend scheduler. This is called after the
* scheduler components are created
* @param scheduler TaskScheduler that will be responsible for task handling
* @param backend SchedulerBackend that works with a TaskScheduler
*/
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
}

View file

@ -0,0 +1 @@
org.apache.spark.scheduler.DummyExternalClusterManager

View file

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext
{
test("launch of backend and scheduler") {
val conf = new SparkConf().setMaster("myclusterManager").
setAppName("testcm").set("spark.driver.allowMultipleContexts", "true")
sc = new SparkContext(conf)
// check if the scheduler components are created
assert(sc.schedulerBackend.isInstanceOf[DummySchedulerBackend])
assert(sc.taskScheduler.isInstanceOf[DummyTaskScheduler])
}
}
private class DummyExternalClusterManager extends ExternalClusterManager {
def canCreate(masterURL: String): Boolean = masterURL == "myclusterManager"
def createTaskScheduler(sc: SparkContext,
masterURL: String): TaskScheduler = new DummyTaskScheduler
def createSchedulerBackend(sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = new DummySchedulerBackend()
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {}
}
private class DummySchedulerBackend extends SchedulerBackend {
def start() {}
def stop() {}
def reviveOffers() {}
def defaultParallelism(): Int = 1
}
private class DummyTaskScheduler extends TaskScheduler {
override def rootPool: Pool = null
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def start(): Unit = {}
override def stop(): Unit = {}
override def submitTasks(taskSet: TaskSet): Unit = {}
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {}
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
override def defaultParallelism(): Int = 2
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def applicationAttemptId(): Option[String] = None
def executorHeartbeatReceived(
execId: String,
accumUpdates: Array[(Long, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId): Boolean = true
}

View file

@ -98,3 +98,4 @@ LZ4BlockInputStream.java
spark-deps-.*
.*csv
.*tsv
org.apache.spark.scheduler.ExternalClusterManager