[SPARK-23191][CORE] Warn rather than terminate when duplicate worker register happens

## What changes were proposed in this pull request?

### Standalone HA Background

In Spark Standalone HA mode, we'll have multiple masters running at the same time. But, there's only one master leader, which actively serving scheduling requests. Once this master leader crashes, other masters would compete for the leader and only one master is guaranteed to be elected as new master leader, which would reconstruct the state from the original master leader and continute to serve scheduling requests.

### Related Issues

#2828 firstly introduces the bug of *duplicate Worker registration*, and #3447 fixed it. But there're still corner cases(see SPARK-23191 for details) where #3447 can not cover it:

* CASE 1
(1) Initially, Worker registered with Master A.
(2) After a while, the connection channel between Master A and Worker becomes inactive(e.g. due to network drop), and Worker is notified about that by calling `onDisconnected` from NettyRpcEnv
(3) When Worker invokes `onDisconnected`, then, it will attempt to reconnect to all masters(including Master A)
(4) At the meanwhile, network between Worker and Master A recover,  Worker successfully register to Master A again
(5) Master A response with `RegisterWorkerFailed("Duplicate worker ID")`
(6) Worker receives that msg, exit

* CASE 2
(1) Master A lost leadership(sends `RevokedLeadership` to itself). Master B takes over and recovery everything from master A(which would  register workers for the first time in Master B) and sends `MasterChanged` to Worker
(2) Before Master A receives `RevokedLeadership`, it receives a late `HeartBeat` from Worker(which had been removed in Master A due to heartbeat timeout previously), so it sends `ReconnectWorker`  to worker
(3) Worker receives `MasterChanged` before `ReconnectWorker` , changing masterRef to Master B
(4) Subsequently, Worker receives `ReconnectWorker` from Master A, then it reconnects to all masters
(5) Master B receives register request again from the Worker,  response with `RegisterWorkerFailed("Duplicate worker ID")`
(6) Worker receives that msg, exit

In CASE 1, it is difficult for the Worker to know Master A's state. Normally, Worker thinks Master A has already died and is impossible that Master A would response with Worker's re-connect request.

In CASE 2, we can see race condition between `RevokedLeadership` and `HeartBeat`. Actually, Master A has already been revoked leadership while processing `HeartBeat` msg. That's means the state between Master and Zookeeper could be out of sync for a while.

### Solutions

In this PR, instead of exiting Worker process when *duplicate Worker registration* happens, we suggest to log warn about it. This would be fine since Master actually perform no-op when it receives duplicate registration from a Worker. In turn, Worker could continue living with that Master normally without any side effect.

## How was this patch tested?

Tested Manually.

I followed the steps as  Neeraj Gupta suggested in JIRA SPARK-23191 to reproduce the case 1.

Before this pr, Worker would be DEAD from UI.
After this pr, Worker just warn the duplicate register behavior (as you can see the second last row in log snippet below), and still be ALIVE from UI.

```
19/05/09 20:58:32 ERROR Worker: Connection to master failed! Waiting for master to reconnect...
19/05/09 20:58:32 INFO Worker: wuyi.local:7077 Disassociated !
19/05/09 20:58:32 INFO Worker: Connecting to master wuyi.local:7077...
19/05/09 20:58:32 ERROR Worker: Connection to master failed! Waiting for master to reconnect...
19/05/09 20:58:32 INFO Worker: Not spawning another attempt to register with the master, since there is an attempt scheduled already.
19/05/09 20:58:37 WARN TransportClientFactory: DNS resolution for wuyi.local/127.0.0.1:7077 took 5005 ms
19/05/09 20:58:37 INFO TransportClientFactory: Found inactive connection to wuyi.local/127.0.0.1:7077, creating a new one.
19/05/09 20:58:37 INFO TransportClientFactory: Successfully created connection to wuyi.local/127.0.0.1:7077 after 3 ms (0 ms spent in bootstraps)
19/05/09 20:58:37 WARN Worker: Duplicate registration at master spark://wuyi.local:7077
19/05/09 20:58:37 INFO Worker: Successfully registered with master spark://wuyi.local:7077
```

Closes #24569 from Ngone51/fix-worker-dup-register-error.

Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
wuyi 2019-05-28 11:59:29 +08:00 committed by Wenchen Fan
parent 29e154b2f1
commit 826ee6074e
4 changed files with 20 additions and 9 deletions

View file

@ -96,11 +96,13 @@ private[deploy] object DeployMessages {
* @param masterWebUiUrl the master Web UI address
* @param masterAddress the master address used by the worker to connect. It should be
* [[RegisterWorker.masterAddress]].
* @param duplicate whether it is a duplicate register request from the worker
*/
case class RegisteredWorker(
master: RpcEndpointRef,
masterWebUiUrl: String,
masterAddress: RpcAddress) extends DeployMessage with RegisterWorkerResponse
masterAddress: RpcAddress,
duplicate: Boolean) extends DeployMessage with RegisterWorkerResponse
case class RegisterWorkerFailed(message: String) extends DeployMessage with RegisterWorkerResponse

View file

@ -250,13 +250,13 @@ private[deploy] class Master(
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
} else if (idToWorker.contains(id)) {
workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, true))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, false))
schedule()
} else {
val workerAddress = worker.endpoint.address

View file

@ -393,12 +393,21 @@ private[deploy] class Worker(
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
msg match {
case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) =>
if (preferConfiguredMasterAddress) {
logInfo("Successfully registered with master " + masterAddress.toSparkURL)
case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress, duplicate) =>
val preferredMasterAddress = if (preferConfiguredMasterAddress) {
masterAddress.toSparkURL
} else {
logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
masterRef.address.toSparkURL
}
// there're corner cases which we could hardly avoid duplicate worker registration,
// e.g. Master disconnect(maybe due to network drop) and recover immediately, see
// SPARK-23191 for more details.
if (duplicate) {
logWarning(s"Duplicate registration at master $preferredMasterAddress")
}
logInfo(s"Successfully registered with master $preferredMasterAddress")
registered = true
changeMaster(masterRef, masterWebUiUrl, masterAddress)
forwardMessageScheduler.scheduleAtFixedRate(

View file

@ -71,7 +71,7 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend
val appDesc = DeployTestUtils.createAppDesc()
val drivers = mutable.HashSet[String]()
override def receive: PartialFunction[Any, Unit] = {
case RegisteredWorker(masterRef, _, _) =>
case RegisteredWorker(masterRef, _, _, _) =>
masterRef.send(WorkerLatestState(id, Nil, drivers.toSeq))
case LaunchDriver(driverId, desc) =>
drivers += driverId
@ -626,7 +626,7 @@ class MasterSuite extends SparkFunSuite
override val rpcEnv: RpcEnv = master.rpcEnv
override def receive: PartialFunction[Any, Unit] = {
case RegisteredWorker(_, _, masterAddress) =>
case RegisteredWorker(_, _, masterAddress, _) =>
receivedMasterAddress = masterAddress
}
})