[SPARK-11131][CORE] Fix race in worker registration protocol.

Because the registration RPC was not really an RPC, but a bunch of
disconnected messages, it was possible for other messages to be
sent before the reply to the registration arrived, and that would
confuse the Worker. Especially in local-cluster mode, the worker was
succeptible to receiving an executor request before it received a
message from the master saying registration succeeded.

On top of the above, the change also fixes a ClassCastException when
the registration fails, which also affects the executor registration
protocol. Because the `ask` is issued with a specific return type,
if the error message (of a different type) was returned instead, the
code would just die with an exception. This is fixed by having a common
base trait for these reply messages.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #9138 from vanzin/SPARK-11131.
This commit is contained in:
Marcelo Vanzin 2015-10-19 16:14:50 -07:00 committed by Andrew Or
parent 67582132bf
commit 7ab0ce6501
6 changed files with 87 additions and 57 deletions

View file

@ -69,9 +69,14 @@ private[deploy] object DeployMessages {
// Master to Worker
case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
sealed trait RegisterWorkerResponse
case class RegisterWorkerFailed(message: String) extends DeployMessage
case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
with RegisterWorkerResponse
case class RegisterWorkerFailed(message: String) extends DeployMessage with RegisterWorkerResponse
case object MasterInStandby extends DeployMessage with RegisterWorkerResponse
case class ReconnectWorker(masterUrl: String) extends DeployMessage

View file

@ -233,31 +233,6 @@ private[deploy] class Master(
System.exit(0)
}
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else if (idToWorker.contains(id)) {
workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerUiPort, publicAddress)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
workerRef.send(RegisteredWorker(self, masterWebUiUrl))
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
}
case RegisterApplication(description, driver) => {
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
@ -387,6 +362,31 @@ private[deploy] class Master(
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
context.reply(MasterInStandby)
} else if (idToWorker.contains(id)) {
context.reply(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerUiPort, publicAddress)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
context.reply(RegisteredWorker(self, masterWebUiUrl))
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
}
case RequestSubmitDriver(description) => {
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +

View file

@ -26,7 +26,7 @@ import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFut
import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
import scala.concurrent.ExecutionContext
import scala.util.Random
import scala.util.{Failure, Random, Success}
import scala.util.control.NonFatal
import org.apache.spark.{Logging, SecurityManager, SparkConf}
@ -213,8 +213,7 @@ private[deploy] class Worker(
logInfo("Connecting to master " + masterAddress + "...")
val masterEndpoint =
rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
masterEndpoint.send(RegisterWorker(
workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
registerWithMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
@ -271,8 +270,7 @@ private[deploy] class Worker(
logInfo("Connecting to master " + masterAddress + "...")
val masterEndpoint =
rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
masterEndpoint.send(RegisterWorker(
workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
registerWithMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
@ -341,25 +339,54 @@ private[deploy] class Worker(
}
}
override def receive: PartialFunction[Any, Unit] = {
case RegisteredWorker(masterRef, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
registered = true
changeMaster(masterRef, masterWebUiUrl)
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(SendHeartbeat)
}
}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
if (CLEANUP_ENABLED) {
logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
.onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
Utils.tryLogNonFatalError {
handleRegisterResponse(msg)
}
case Failure(e) =>
logError(s"Cannot register with master: ${masterEndpoint.address}", e)
System.exit(1)
}(ThreadUtils.sameThread)
}
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
msg match {
case RegisteredWorker(masterRef, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
registered = true
changeMaster(masterRef, masterWebUiUrl)
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(WorkDirCleanup)
self.send(SendHeartbeat)
}
}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}
}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
if (CLEANUP_ENABLED) {
logInfo(
s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(WorkDirCleanup)
}
}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}
case RegisterWorkerFailed(message) =>
if (!registered) {
logError("Worker registration failed: " + message)
System.exit(1)
}
case MasterInStandby =>
// Ignore. Master not yet ready.
}
}
override def receive: PartialFunction[Any, Unit] = synchronized {
case SendHeartbeat =>
if (connected) { sendToMaster(Heartbeat(workerId, self)) }
@ -399,12 +426,6 @@ private[deploy] class Worker(
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))
case RegisterWorkerFailed(message) =>
if (!registered) {
logError("Worker registration failed: " + message)
System.exit(1)
}
case ReconnectWorker(masterUrl) =>
logInfo(s"Master with url $masterUrl requested this worker to reconnect.")
registerWithMaster()

View file

@ -59,12 +59,12 @@ private[spark] class CoarseGrainedExecutorBackend(
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[RegisteredExecutor.type](
ref.ask[RegisterExecutorResponse](
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) => Utils.tryLogNonFatalError {
Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor
Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse
}
case Failure(e) => {
logError(s"Cannot register with driver: $driverUrl", e)

View file

@ -36,9 +36,13 @@ private[spark] object CoarseGrainedClusterMessages {
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
extends CoarseGrainedClusterMessage
sealed trait RegisterExecutorResponse
case object RegisteredExecutor extends CoarseGrainedClusterMessage
with RegisterExecutorResponse
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
with RegisterExecutorResponse
// Executors to driver
case class RegisterExecutor(

View file

@ -173,9 +173,9 @@ class HeartbeatReceiverSuite
val dummyExecutorEndpoint2 = new FakeExecutorEndpoint(rpcEnv)
val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type](
fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "dummy:4040", 0, Map.empty))
fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type](
fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty))
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)