[SPARK-24352][CORE][TESTS] De-flake StandaloneDynamicAllocationSuite blacklist test

The issue is that the test tried to stop an existing scheduler and replace it with
a new one set up for the test. That can cause issues because both were sharing the
same RpcEnv underneath, and unregistering RpcEndpoints is actually asynchronous
(see comment in Dispatcher.unregisterRpcEndpoint). So that could lead to races where
the new scheduler tried to register before the old one was fully unregistered.

The updated test avoids the issue by using a separate RpcEnv / scheduler instance
altogether, and also avoids a misleading NPE in the test logs.

Closes #25318 from vanzin/SPARK-24352.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Marcelo Vanzin 2019-07-31 17:44:20 -07:00 committed by Dongjoon Hyun
parent 70ef9064a8
commit b3ffd8be14

View file

@ -493,28 +493,35 @@ class StandaloneDynamicAllocationSuite
}
test("executor registration on a blacklisted host must fail") {
// The context isn't really used by the test, but it helps with creating a test scheduler,
// since CoarseGrainedSchedulerBackend makes a lot of calls to the context instance.
sc = new SparkContext(appConf.set(config.BLACKLIST_ENABLED.key, "true"))
val endpointRef = mock(classOf[RpcEndpointRef])
val mockAddress = mock(classOf[RpcAddress])
when(endpointRef.address).thenReturn(mockAddress)
val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty, Map.empty,
Map.empty)
val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty,
Map.empty, Map.empty)
// Get "localhost" on a blacklist.
val taskScheduler = mock(classOf[TaskSchedulerImpl])
when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host"))
when(taskScheduler.resourceOffers(any())).thenReturn(Nil)
when(taskScheduler.sc).thenReturn(sc)
sc.taskScheduler = taskScheduler
// Create a fresh scheduler backend to blacklist "localhost".
sc.schedulerBackend.stop()
val backend =
new StandaloneSchedulerBackend(taskScheduler, sc, Array(masterRpcEnv.address.toSparkURL))
backend.start()
backend.driverEndpoint.ask[Boolean](message)
eventually(timeout(10.seconds), interval(100.millis)) {
verify(endpointRef).send(RegisterExecutorFailed(any()))
val rpcEnv = RpcEnv.create("test-rpcenv", "localhost", 0, conf, securityManager)
try {
val scheduler = new CoarseGrainedSchedulerBackend(taskScheduler, rpcEnv)
try {
scheduler.start()
scheduler.driverEndpoint.ask[Boolean](message)
eventually(timeout(10.seconds), interval(100.millis)) {
verify(endpointRef).send(RegisterExecutorFailed(any()))
}
} finally {
scheduler.stop()
}
} finally {
rpcEnv.shutdown()
}
}