[SPARK-17755][CORE] Use workerRef to send RegisterWorkerResponse to avoid the race condition

## What changes were proposed in this pull request?

The root cause of this issue is that RegisterWorkerResponse and LaunchExecutor are sent via two different channels (TCP connections) and their order is not guaranteed.

This PR changes the master and worker codes to use `workerRef` to send RegisterWorkerResponse, so that RegisterWorkerResponse and LaunchExecutor are sent via the same connection. Hence `LaunchExecutor` will always be after `RegisterWorkerResponse` and never be ignored.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16345 from zsxwing/SPARK-17755.
This commit is contained in:
Shixiong Zhu 2016-12-25 23:48:14 -08:00
parent d6cbec7598
commit 7026ee23e0
3 changed files with 32 additions and 41 deletions

View file

@ -231,6 +231,29 @@ private[deploy] class Master(
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
case RegisterWorker(id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
} else if (idToWorker.contains(id)) {
workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
workerRef.send(RegisteredWorker(self, masterWebUiUrl))
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
case RegisterApplication(description, driver) =>
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
@ -386,30 +409,6 @@ private[deploy] class Master(
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
context.reply(MasterInStandby)
} else if (idToWorker.contains(id)) {
context.reply(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
context.reply(RegisteredWorker(self, masterWebUiUrl))
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +

View file

@ -26,7 +26,7 @@ import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFut
import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Random, Success}
import scala.util.Random
import scala.util.control.NonFatal
import org.apache.spark.{SecurityManager, SparkConf}
@ -216,7 +216,7 @@ private[deploy] class Worker(
try {
logInfo("Connecting to master " + masterAddress + "...")
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
registerWithMaster(masterEndpoint)
sendRegisterMessageToMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
@ -272,7 +272,7 @@ private[deploy] class Worker(
try {
logInfo("Connecting to master " + masterAddress + "...")
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
registerWithMaster(masterEndpoint)
sendRegisterMessageToMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
@ -341,19 +341,8 @@ private[deploy] class Worker(
}
}
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
workerId, host, port, self, cores, memory, workerWebUiUrl))
.onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
Utils.tryLogNonFatalError {
handleRegisterResponse(msg)
}
case Failure(e) =>
logError(s"Cannot register with master: ${masterEndpoint.address}", e)
System.exit(1)
}(ThreadUtils.sameThread)
private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.send(RegisterWorker(workerId, host, port, self, cores, memory, workerWebUiUrl))
}
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
@ -394,6 +383,9 @@ private[deploy] class Worker(
}
override def receive: PartialFunction[Any, Unit] = synchronized {
case msg: RegisterWorkerResponse =>
handleRegisterResponse(msg)
case SendHeartbeat =>
if (connected) { sendToMaster(Heartbeat(workerId, self)) }

View file

@ -447,7 +447,7 @@ class MasterSuite extends SparkFunSuite
}
})
master.self.ask(
master.self.send(
RegisterWorker("1", "localhost", 9999, fakeWorker, 10, 1024, "http://localhost:8080"))
val executors = (0 until 3).map { i =>
new ExecutorDescription(appId = i.toString, execId = i, 2, ExecutorState.RUNNING)