[SPARK-10102] [STREAMING] Fix a race condition that startReceiver may happen before setting trackerState to Started
Test failure: https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-Maven-with-YARN/HADOOP_PROFILE=hadoop-2.4,label=spark-test/3305/testReport/junit/org.apache.spark.streaming/StreamingContextSuite/stop_gracefully/ There is a race condition that setting `trackerState` to `Started` could happen after calling `startReceiver`. Then `startReceiver` won't start the receivers because it uses `! isTrackerStarted` to check if ReceiverTracker is stopping or stopped. But actually, `trackerState` is `Initialized` and will be changed to `Started` soon. Therefore, we should use `isTrackerStopping || isTrackerStopped`. Author: zsxwing <zsxwing@gmail.com> Closes #8294 from zsxwing/SPARK-9504.
This commit is contained in:
parent
1aeae05bb2
commit
90273eff96
|
@ -468,8 +468,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
|
|||
* Start a receiver along with its scheduled executors
|
||||
*/
|
||||
private def startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]): Unit = {
|
||||
def shouldStartReceiver: Boolean = {
|
||||
// It's okay to start when trackerState is Initialized or Started
|
||||
!(isTrackerStopping || isTrackerStopped)
|
||||
}
|
||||
|
||||
val receiverId = receiver.streamId
|
||||
if (!isTrackerStarted) {
|
||||
if (!shouldStartReceiver) {
|
||||
onReceiverJobFinish(receiverId)
|
||||
return
|
||||
}
|
||||
|
@ -494,14 +499,14 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
|
|||
// We will keep restarting the receiver job until ReceiverTracker is stopped
|
||||
future.onComplete {
|
||||
case Success(_) =>
|
||||
if (!isTrackerStarted) {
|
||||
if (!shouldStartReceiver) {
|
||||
onReceiverJobFinish(receiverId)
|
||||
} else {
|
||||
logInfo(s"Restarting Receiver $receiverId")
|
||||
self.send(RestartReceiver(receiver))
|
||||
}
|
||||
case Failure(e) =>
|
||||
if (!isTrackerStarted) {
|
||||
if (!shouldStartReceiver) {
|
||||
onReceiverJobFinish(receiverId)
|
||||
} else {
|
||||
logError("Receiver has been stopped. Try to restart it.", e)
|
||||
|
|
Loading…
Reference in a new issue