diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 49a319abb3..5723b0f690 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -96,11 +96,13 @@ private[deploy] object DeployMessages { * @param masterWebUiUrl the master Web UI address * @param masterAddress the master address used by the worker to connect. It should be * [[RegisterWorker.masterAddress]]. + * @param duplicate whether it is a duplicate register request from the worker */ case class RegisteredWorker( master: RpcEndpointRef, masterWebUiUrl: String, - masterAddress: RpcAddress) extends DeployMessage with RegisterWorkerResponse + masterAddress: RpcAddress, + duplicate: Boolean) extends DeployMessage with RegisterWorkerResponse case class RegisterWorkerFailed(message: String) extends DeployMessage with RegisterWorkerResponse diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 5db9b529a2..3c0a49e4ab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -250,13 +250,13 @@ private[deploy] class Master( if (state == RecoveryState.STANDBY) { workerRef.send(MasterInStandby) } else if (idToWorker.contains(id)) { - workerRef.send(RegisterWorkerFailed("Duplicate worker ID")) + workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, true)) } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, workerRef, workerWebUiUrl) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) - workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress)) + workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, false)) schedule() } else { val workerAddress = worker.endpoint.address diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 974e54689c..b432febf51 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -393,12 +393,21 @@ private[deploy] class Worker( private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized { msg match { - case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) => - if (preferConfiguredMasterAddress) { - logInfo("Successfully registered with master " + masterAddress.toSparkURL) + case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress, duplicate) => + val preferredMasterAddress = if (preferConfiguredMasterAddress) { + masterAddress.toSparkURL } else { - logInfo("Successfully registered with master " + masterRef.address.toSparkURL) + masterRef.address.toSparkURL } + + // there're corner cases which we could hardly avoid duplicate worker registration, + // e.g. Master disconnect(maybe due to network drop) and recover immediately, see + // SPARK-23191 for more details. + if (duplicate) { + logWarning(s"Duplicate registration at master $preferredMasterAddress") + } + + logInfo(s"Successfully registered with master $preferredMasterAddress") registered = true changeMaster(masterRef, masterWebUiUrl, masterAddress) forwardMessageScheduler.scheduleAtFixedRate( diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 0c4b1058e2..f19e998946 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -71,7 +71,7 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend val appDesc = DeployTestUtils.createAppDesc() val drivers = mutable.HashSet[String]() override def receive: PartialFunction[Any, Unit] = { - case RegisteredWorker(masterRef, _, _) => + case RegisteredWorker(masterRef, _, _, _) => masterRef.send(WorkerLatestState(id, Nil, drivers.toSeq)) case LaunchDriver(driverId, desc) => drivers += driverId @@ -626,7 +626,7 @@ class MasterSuite extends SparkFunSuite override val rpcEnv: RpcEnv = master.rpcEnv override def receive: PartialFunction[Any, Unit] = { - case RegisteredWorker(_, _, masterAddress) => + case RegisteredWorker(_, _, masterAddress, _) => receivedMasterAddress = masterAddress } })