[SPARK-13604][CORE] Sync worker's state after registering with master

## What changes were proposed in this pull request?

Here lists all cases that Master cannot talk with Worker for a while and then network is back.

1. Master doesn't know the network issue (not yet timeout)

  a. Worker doesn't know the network issue (onDisconnected is not called)
    - Worker keeps sending Heartbeat. Both Worker and Master don't know the network issue. Nothing to do. (Finally, Master will notice the heartbeat timeout if network is not recovered)

  b. Worker knows the network issue (onDisconnected is called)
    - Worker stops sending Heartbeat and sends `RegisterWorker` to master. Master will reply `RegisterWorkerFailed("Duplicate worker ID")`. Worker calls "System.exit(1)" (Finally, Master will notice the heartbeat timeout if network is not recovered) (May leak driver processes. See [SPARK-13602](https://issues.apache.org/jira/browse/SPARK-13602))

2. Worker timeout (Master knows the network issue). In such case,  master removes Worker and its executors and drivers.

  a. Worker doesn't know the network issue (onDisconnected is not called)
    - Worker keeps sending Heartbeat.
    - If the network is back, say Master receives Heartbeat, Master sends `ReconnectWorker` to Worker
    - Worker send `RegisterWorker` to master.
    - Master accepts `RegisterWorker` but doesn't know executors and drivers in Worker. (may leak executors)

  b. Worker knows the network issue (onDisconnected is called)
    - Worker stop sending `Heartbeat`. Worker will send "RegisterWorker" to master.
    - Master accepts `RegisterWorker` but doesn't know executors and drivers in Worker. (may leak executors)

This PR fixes executors and drivers leak in 2.a and 2.b when Worker reregisters with Master. The approach is making Worker send `WorkerLatestState` to sync the state after registering with master successfully. Then Master will ask Worker to kill unknown executors and drivers.

Note:  Worker cannot just kill executors after registering with master because in the worker, `LaunchExecutor` and `RegisteredWorker` are processed in two threads. If `LaunchExecutor` happens before `RegisteredWorker`, Worker's executor list will contain new executors after Master accepts `RegisterWorker`. We should not kill these executors. So sending the list to Master and let Master tell Worker which executors should be killed.

## How was this patch tested?

test("SPARK-13604: Master should ask Worker kill unknown executors and drivers")

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #11455 from zsxwing/orphan-executors.
This commit is contained in:
Shixiong Zhu 2016-03-10 16:59:14 -08:00 committed by Andrew Or
parent 020ff8cd34
commit 27fe6bacc5
4 changed files with 90 additions and 5 deletions

View file

@ -64,6 +64,16 @@ private[deploy] object DeployMessages {
case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription], case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription],
driverIds: Seq[String]) driverIds: Seq[String])
/**
* A worker will send this message to the master when it registers with the master. Then the
* master will compare them with the executors and drivers in the master and tell the worker to
* kill the unknown executors and drivers.
*/
case class WorkerLatestState(
id: String,
executors: Seq[ExecutorDescription],
driverIds: Seq[String]) extends DeployMessage
case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
// Master to Worker // Master to Worker

View file

@ -368,6 +368,30 @@ private[deploy] class Master(
if (canCompleteRecovery) { completeRecovery() } if (canCompleteRecovery) { completeRecovery() }
} }
case WorkerLatestState(workerId, executors, driverIds) =>
idToWorker.get(workerId) match {
case Some(worker) =>
for (exec <- executors) {
val executorMatches = worker.executors.exists {
case (_, e) => e.application.id == exec.appId && e.id == exec.execId
}
if (!executorMatches) {
// master doesn't recognize this executor. So just tell worker to kill it.
worker.endpoint.send(KillExecutor(masterUrl, exec.appId, exec.execId))
}
}
for (driverId <- driverIds) {
val driverMatches = worker.drivers.exists { case (id, _) => id == driverId }
if (!driverMatches) {
// master doesn't recognize this driver. So just tell worker to kill it.
worker.endpoint.send(KillDriver(driverId))
}
}
case None =>
logWarning("Worker state from unknown worker: " + workerId)
}
case UnregisterApplication(applicationId) => case UnregisterApplication(applicationId) =>
logInfo(s"Received unregister request from application $applicationId") logInfo(s"Received unregister request from application $applicationId")
idToApp.get(applicationId).foreach(finishApplication) idToApp.get(applicationId).foreach(finishApplication)

View file

@ -374,6 +374,11 @@ private[deploy] class Worker(
}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
} }
val execs = executors.values.map { e =>
new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
}
masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
case RegisterWorkerFailed(message) => case RegisterWorkerFailed(message) =>
if (!registered) { if (!registered) {
logError("Worker registration failed: " + message) logError("Worker registration failed: " + message)

View file

@ -18,22 +18,36 @@
package org.apache.spark.deploy.master package org.apache.spark.deploy.master
import java.util.Date import java.util.Date
import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.io.Source import scala.io.Source
import scala.language.postfixOps import scala.language.postfixOps
import org.json4s._ import org.json4s._
import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.JsonMethods._
import org.scalatest.{Matchers, PrivateMethodTester} import org.scalatest.{BeforeAndAfter, Matchers, PrivateMethodTester}
import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.Eventually
import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory} import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy._ import org.apache.spark.deploy._
import org.apache.spark.rpc.RpcEnv import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.rpc.{RpcEndpoint, RpcEnv}
class MasterSuite extends SparkFunSuite with Matchers with Eventually with PrivateMethodTester { class MasterSuite extends SparkFunSuite
with Matchers with Eventually with PrivateMethodTester with BeforeAndAfter {
private var _master: Master = _
after {
if (_master != null) {
_master.rpcEnv.shutdown()
_master.rpcEnv.awaitTermination()
_master = null
}
}
test("can use a custom recovery mode factory") { test("can use a custom recovery mode factory") {
val conf = new SparkConf(loadDefaults = false) val conf = new SparkConf(loadDefaults = false)
@ -357,10 +371,11 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
private val workerInfos = Array(workerInfo, workerInfo, workerInfo) private val workerInfos = Array(workerInfo, workerInfo, workerInfo)
private def makeMaster(conf: SparkConf = new SparkConf): Master = { private def makeMaster(conf: SparkConf = new SparkConf): Master = {
assert(_master === null, "Some Master's RpcEnv is leaked in tests")
val securityMgr = new SecurityManager(conf) val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityMgr) val rpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityMgr)
val master = new Master(rpcEnv, rpcEnv.address, 0, securityMgr, conf) _master = new Master(rpcEnv, rpcEnv.address, 0, securityMgr, conf)
master _master
} }
private def makeAppInfo( private def makeAppInfo(
@ -386,4 +401,35 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
master.invokePrivate(_scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut)) master.invokePrivate(_scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut))
} }
test("SPARK-13604: Master should ask Worker kill unknown executors and drivers") {
val master = makeMaster()
master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
eventually(timeout(10.seconds)) {
val masterState = master.self.askWithRetry[MasterStateResponse](RequestMasterState)
assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
}
val killedExecutors = new ConcurrentLinkedQueue[(String, Int)]()
val killedDrivers = new ConcurrentLinkedQueue[String]()
val fakeWorker = master.rpcEnv.setupEndpoint("worker", new RpcEndpoint {
override val rpcEnv: RpcEnv = master.rpcEnv
override def receive: PartialFunction[Any, Unit] = {
case KillExecutor(_, appId, execId) => killedExecutors.add(appId, execId)
case KillDriver(driverId) => killedDrivers.add(driverId)
}
})
master.self.ask(
RegisterWorker("1", "localhost", 9999, fakeWorker, 10, 1024, "http://localhost:8080"))
val executors = (0 until 3).map { i =>
new ExecutorDescription(appId = i.toString, execId = i, 2, ExecutorState.RUNNING)
}
master.self.send(WorkerLatestState("1", executors, driverIds = Seq("0", "1", "2")))
eventually(timeout(10.seconds)) {
assert(killedExecutors.asScala.toList.sorted === List("0" -> 0, "1" -> 1, "2" -> 2))
assert(killedDrivers.asScala.toList.sorted === List("0", "1", "2"))
}
}
} }