[SPARK-7563] OutputCommitCoordinator.stop() should only run on the driver

This fixes a bug where an executor that exits can cause the driver's OutputCommitCoordinator to stop. To fix this, we use an `isDriver` flag and check it in `stop()`.

See https://issues.apache.org/jira/browse/SPARK-7563 for more details.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #6197 from JoshRosen/SPARK-7563 and squashes the following commits:

04b2cc5 [Josh Rosen] [SPARK-7563] OutputCommitCoordinator.stop() should only be executed on the driver

(cherry picked from commit 2c04c8a1ae)
Signed-off-by: Patrick Wendell <patrick@databricks.com>
This commit is contained in:
Josh Rosen 2015-05-15 18:06:01 -07:00 committed by Patrick Wendell
parent 6f78d03d2a
commit ed75cc02bc
3 changed files with 8 additions and 6 deletions

View file

@ -379,7 +379,7 @@ object SparkEnv extends Logging {
}
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf)
new OutputCommitCoordinator(conf, isDriver)
}
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))

View file

@ -38,7 +38,7 @@ private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttem
* This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests)
* for an extensive design discussion.
*/
private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) extends Logging {
// Initialized by SparkEnv
var coordinatorRef: Option[RpcEndpointRef] = None
@ -129,9 +129,11 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
}
def stop(): Unit = synchronized {
coordinatorRef.foreach(_ send StopCoordinator)
coordinatorRef = None
authorizedCommittersByStage.clear()
if (isDriver) {
coordinatorRef.foreach(_ send StopCoordinator)
coordinatorRef = None
authorizedCommittersByStage.clear()
}
}
// Marked private[scheduler] instead of private so this can be mocked in tests

View file

@ -81,7 +81,7 @@ class OutputCommitCoordinatorSuite extends FunSuite with BeforeAndAfter {
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
outputCommitCoordinator = spy(new OutputCommitCoordinator(conf))
outputCommitCoordinator = spy(new OutputCommitCoordinator(conf, isDriver = true))
// Use Mockito.spy() to maintain the default infrastructure everywhere else.
// This mocking allows us to control the coordinator responses in test cases.
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, Some(outputCommitCoordinator))