[SPARK-10058] [CORE] [TESTS] Fix the flaky tests in HeartbeatReceiverSuite
Fixed the test failure here: https://amplab.cs.berkeley.edu/jenkins/view/Spark-QA-Test/job/Spark-1.5-SBT/116/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.2,label=spark-test/testReport/junit/org.apache.spark/HeartbeatReceiverSuite/normal_heartbeat/
This failure is because `HeartbeatReceiverSuite. heartbeatReceiver` may receive `SparkListenerExecutorAdded("driver")` sent from [LocalBackend](8fb3a65cbb/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala (L121)
).
There are other race conditions in `HeartbeatReceiverSuite` because `HeartbeatReceiver.onExecutorAdded` and `HeartbeatReceiver.onExecutorRemoved` are asynchronous. This PR also fixed them.
Author: zsxwing <zsxwing@gmail.com>
Closes #8946 from zsxwing/SPARK-10058.
This commit is contained in:
parent
f21e2da03f
commit
9b3e7768a2
|
@ -20,6 +20,7 @@ package org.apache.spark
|
||||||
import java.util.concurrent.{ScheduledFuture, TimeUnit}
|
import java.util.concurrent.{ScheduledFuture, TimeUnit}
|
||||||
|
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
|
import scala.concurrent.Future
|
||||||
|
|
||||||
import org.apache.spark.executor.TaskMetrics
|
import org.apache.spark.executor.TaskMetrics
|
||||||
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext}
|
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext}
|
||||||
|
@ -147,11 +148,31 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send ExecutorRegistered to the event loop to add a new executor. Only for test.
|
||||||
|
*
|
||||||
|
* @return if HeartbeatReceiver is stopped, return None. Otherwise, return a Some(Future) that
|
||||||
|
* indicate if this operation is successful.
|
||||||
|
*/
|
||||||
|
def addExecutor(executorId: String): Option[Future[Boolean]] = {
|
||||||
|
Option(self).map(_.ask[Boolean](ExecutorRegistered(executorId)))
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If the heartbeat receiver is not stopped, notify it of executor registrations.
|
* If the heartbeat receiver is not stopped, notify it of executor registrations.
|
||||||
*/
|
*/
|
||||||
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
|
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
|
||||||
Option(self).foreach(_.ask[Boolean](ExecutorRegistered(executorAdded.executorId)))
|
addExecutor(executorAdded.executorId)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send ExecutorRemoved to the event loop to remove a executor. Only for test.
|
||||||
|
*
|
||||||
|
* @return if HeartbeatReceiver is stopped, return None. Otherwise, return a Some(Future) that
|
||||||
|
* indicate if this operation is successful.
|
||||||
|
*/
|
||||||
|
def removeExecutor(executorId: String): Option[Future[Boolean]] = {
|
||||||
|
Option(self).map(_.ask[Boolean](ExecutorRemoved(executorId)))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -165,7 +186,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
|
||||||
* and expire it with loud error messages.
|
* and expire it with loud error messages.
|
||||||
*/
|
*/
|
||||||
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
|
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
|
||||||
Option(self).foreach(_.ask[Boolean](ExecutorRemoved(executorRemoved.executorId)))
|
removeExecutor(executorRemoved.executorId)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def expireDeadHosts(): Unit = {
|
private def expireDeadHosts(): Unit = {
|
||||||
|
|
|
@ -19,7 +19,10 @@ package org.apache.spark
|
||||||
|
|
||||||
import java.util.concurrent.{ExecutorService, TimeUnit}
|
import java.util.concurrent.{ExecutorService, TimeUnit}
|
||||||
|
|
||||||
|
import scala.collection.Map
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
|
import scala.concurrent.Await
|
||||||
|
import scala.concurrent.duration._
|
||||||
import scala.language.postfixOps
|
import scala.language.postfixOps
|
||||||
|
|
||||||
import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
|
import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
|
||||||
|
@ -96,18 +99,18 @@ class HeartbeatReceiverSuite
|
||||||
|
|
||||||
test("normal heartbeat") {
|
test("normal heartbeat") {
|
||||||
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
|
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
|
||||||
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
|
addExecutorAndVerify(executorId1)
|
||||||
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
|
addExecutorAndVerify(executorId2)
|
||||||
triggerHeartbeat(executorId1, executorShouldReregister = false)
|
triggerHeartbeat(executorId1, executorShouldReregister = false)
|
||||||
triggerHeartbeat(executorId2, executorShouldReregister = false)
|
triggerHeartbeat(executorId2, executorShouldReregister = false)
|
||||||
val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen())
|
val trackedExecutors = getTrackedExecutors
|
||||||
assert(trackedExecutors.size === 2)
|
assert(trackedExecutors.size === 2)
|
||||||
assert(trackedExecutors.contains(executorId1))
|
assert(trackedExecutors.contains(executorId1))
|
||||||
assert(trackedExecutors.contains(executorId2))
|
assert(trackedExecutors.contains(executorId2))
|
||||||
}
|
}
|
||||||
|
|
||||||
test("reregister if scheduler is not ready yet") {
|
test("reregister if scheduler is not ready yet") {
|
||||||
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
|
addExecutorAndVerify(executorId1)
|
||||||
// Task scheduler is not set yet in HeartbeatReceiver, so executors should reregister
|
// Task scheduler is not set yet in HeartbeatReceiver, so executors should reregister
|
||||||
triggerHeartbeat(executorId1, executorShouldReregister = true)
|
triggerHeartbeat(executorId1, executorShouldReregister = true)
|
||||||
}
|
}
|
||||||
|
@ -116,20 +119,20 @@ class HeartbeatReceiverSuite
|
||||||
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
|
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
|
||||||
// Received heartbeat from unknown executor, so we ask it to re-register
|
// Received heartbeat from unknown executor, so we ask it to re-register
|
||||||
triggerHeartbeat(executorId1, executorShouldReregister = true)
|
triggerHeartbeat(executorId1, executorShouldReregister = true)
|
||||||
assert(heartbeatReceiver.invokePrivate(_executorLastSeen()).isEmpty)
|
assert(getTrackedExecutors.isEmpty)
|
||||||
}
|
}
|
||||||
|
|
||||||
test("reregister if heartbeat from removed executor") {
|
test("reregister if heartbeat from removed executor") {
|
||||||
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
|
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
|
||||||
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
|
addExecutorAndVerify(executorId1)
|
||||||
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
|
addExecutorAndVerify(executorId2)
|
||||||
// Remove the second executor but not the first
|
// Remove the second executor but not the first
|
||||||
heartbeatReceiver.onExecutorRemoved(SparkListenerExecutorRemoved(0, executorId2, "bad boy"))
|
removeExecutorAndVerify(executorId2)
|
||||||
// Now trigger the heartbeats
|
// Now trigger the heartbeats
|
||||||
// A heartbeat from the second executor should require reregistering
|
// A heartbeat from the second executor should require reregistering
|
||||||
triggerHeartbeat(executorId1, executorShouldReregister = false)
|
triggerHeartbeat(executorId1, executorShouldReregister = false)
|
||||||
triggerHeartbeat(executorId2, executorShouldReregister = true)
|
triggerHeartbeat(executorId2, executorShouldReregister = true)
|
||||||
val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen())
|
val trackedExecutors = getTrackedExecutors
|
||||||
assert(trackedExecutors.size === 1)
|
assert(trackedExecutors.size === 1)
|
||||||
assert(trackedExecutors.contains(executorId1))
|
assert(trackedExecutors.contains(executorId1))
|
||||||
assert(!trackedExecutors.contains(executorId2))
|
assert(!trackedExecutors.contains(executorId2))
|
||||||
|
@ -138,8 +141,8 @@ class HeartbeatReceiverSuite
|
||||||
test("expire dead hosts") {
|
test("expire dead hosts") {
|
||||||
val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
|
val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
|
||||||
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
|
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
|
||||||
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
|
addExecutorAndVerify(executorId1)
|
||||||
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
|
addExecutorAndVerify(executorId2)
|
||||||
triggerHeartbeat(executorId1, executorShouldReregister = false)
|
triggerHeartbeat(executorId1, executorShouldReregister = false)
|
||||||
triggerHeartbeat(executorId2, executorShouldReregister = false)
|
triggerHeartbeat(executorId2, executorShouldReregister = false)
|
||||||
// Advance the clock and only trigger a heartbeat for the first executor
|
// Advance the clock and only trigger a heartbeat for the first executor
|
||||||
|
@ -149,7 +152,7 @@ class HeartbeatReceiverSuite
|
||||||
heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts)
|
heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts)
|
||||||
// Only the second executor should be expired as a dead host
|
// Only the second executor should be expired as a dead host
|
||||||
verify(scheduler).executorLost(Matchers.eq(executorId2), any())
|
verify(scheduler).executorLost(Matchers.eq(executorId2), any())
|
||||||
val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen())
|
val trackedExecutors = getTrackedExecutors
|
||||||
assert(trackedExecutors.size === 1)
|
assert(trackedExecutors.size === 1)
|
||||||
assert(trackedExecutors.contains(executorId1))
|
assert(trackedExecutors.contains(executorId1))
|
||||||
assert(!trackedExecutors.contains(executorId2))
|
assert(!trackedExecutors.contains(executorId2))
|
||||||
|
@ -175,8 +178,8 @@ class HeartbeatReceiverSuite
|
||||||
fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type](
|
fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type](
|
||||||
RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty))
|
RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty))
|
||||||
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
|
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
|
||||||
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
|
addExecutorAndVerify(executorId1)
|
||||||
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
|
addExecutorAndVerify(executorId2)
|
||||||
triggerHeartbeat(executorId1, executorShouldReregister = false)
|
triggerHeartbeat(executorId1, executorShouldReregister = false)
|
||||||
triggerHeartbeat(executorId2, executorShouldReregister = false)
|
triggerHeartbeat(executorId2, executorShouldReregister = false)
|
||||||
|
|
||||||
|
@ -222,6 +225,26 @@ class HeartbeatReceiverSuite
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def addExecutorAndVerify(executorId: String): Unit = {
|
||||||
|
assert(
|
||||||
|
heartbeatReceiver.addExecutor(executorId).map { f =>
|
||||||
|
Await.result(f, 10.seconds)
|
||||||
|
} === Some(true))
|
||||||
|
}
|
||||||
|
|
||||||
|
private def removeExecutorAndVerify(executorId: String): Unit = {
|
||||||
|
assert(
|
||||||
|
heartbeatReceiver.removeExecutor(executorId).map { f =>
|
||||||
|
Await.result(f, 10.seconds)
|
||||||
|
} === Some(true))
|
||||||
|
}
|
||||||
|
|
||||||
|
private def getTrackedExecutors: Map[String, Long] = {
|
||||||
|
// We may receive undesired SparkListenerExecutorAdded from LocalBackend, so exclude it from
|
||||||
|
// the map. See SPARK-10800.
|
||||||
|
heartbeatReceiver.invokePrivate(_executorLastSeen()).
|
||||||
|
filterKeys(_ != SparkContext.DRIVER_IDENTIFIER)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: use these classes to add end-to-end tests for dynamic allocation!
|
// TODO: use these classes to add end-to-end tests for dynamic allocation!
|
||||||
|
|
Loading…
Reference in a new issue