[SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

### What changes were proposed in this pull request?
There is a case for current code may cause data correctness issue:
  1. One writing data task commit task success, but the container was killed after commit task, so this task failed, but the data file still remained under committedTaskPath .
  2. DAGScheduler call `handleCompletedTask` and then call `outputCommitCoordinator.taskCompleted`,  outputCommitCoordinator will remove the lock of failed task's partition.
  3. Then failed task's rerun with an new attempt,  the new attempt task call `outputCommitCoordinator.canCommit()` will return true since the lock of this partition had been removed, then it commit task success, also task final  succeed.
  4. Two files remained under this job's attempt path for  same partition.
  5. CommitJob commit both two committed task path's data.
  6. Finally data duplicated.

In this pr, we do below since:

1. When commit task success, executor send an CommitOutputSuccess message to  outputCommitCoordinator.
2. When outputCommitCoordinator handle `taskComplete`, if task failed but commit success, means data duplicate will happen,  we should failed to job.

### Why are the changes needed?
Fix data duplicated issue.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

Closes #36564 from AngersZhuuuu/SPARK-39195.

Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
master
Angerszhuuuu 2022-06-21 16:35:10 +08:00 committed by Wenchen Fan
parent 42d33e1336
commit 29cf0c4121
7 changed files with 60 additions and 20 deletions

View File

@ -276,7 +276,12 @@ class SparkContext(config: SparkConf) extends Logging {
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
SparkEnv.createDriverEnv(
conf,
isLocal,
listenerBus,
SparkContext.numDriverCores(master, conf),
this)
}
private[spark] def env: SparkEnv = _env

View File

@ -169,6 +169,7 @@ object SparkEnv extends Logging {
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
sparkContext: SparkContext,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains(DRIVER_HOST_ADDRESS),
s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
@ -191,6 +192,7 @@ object SparkEnv extends Logging {
numCores,
ioEncryptionKey,
listenerBus = listenerBus,
Option(sparkContext),
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
}
@ -235,6 +237,7 @@ object SparkEnv extends Logging {
/**
* Helper method to create a SparkEnv for a driver or an executor.
*/
// scalastyle:off argcount
private def create(
conf: SparkConf,
executorId: String,
@ -245,7 +248,9 @@ object SparkEnv extends Logging {
numUsableCores: Int,
ioEncryptionKey: Option[Array[Byte]],
listenerBus: LiveListenerBus = null,
sc: Option[SparkContext] = None,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
// scalastyle:on argcount
val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
@ -391,7 +396,12 @@ object SparkEnv extends Logging {
}
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf, isDriver)
if (isDriver) {
new OutputCommitCoordinator(conf, isDriver, sc)
} else {
new OutputCommitCoordinator(conf, isDriver)
}
}
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))

View File

@ -1175,6 +1175,13 @@ private[spark] class DAGScheduler(
listenerBus.post(SparkListenerUnschedulableTaskSetRemoved(stageId, stageAttemptId))
}
private[scheduler] def handleStageFailed(
stageId: Int,
reason: String,
exception: Option[Throwable]): Unit = {
stageIdToStage.get(stageId).foreach { abortStage(_, reason, exception) }
}
private[scheduler] def handleTaskSetFailed(
taskSet: TaskSet,
reason: String,
@ -2608,6 +2615,13 @@ private[spark] class DAGScheduler(
runningStages -= stage
}
/**
* Called by the OutputCommitCoordinator to cancel stage due to data duplication may happen.
*/
private[scheduler] def stageFailed(stageId: Int, reason: String): Unit = {
eventProcessLoop.post(StageFailed(stageId, reason, None))
}
/**
* Aborts all jobs depending on a particular Stage. This is called in response to a task set
* being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside.
@ -2876,6 +2890,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)
case StageFailed(stageId, reason, exception) =>
dagScheduler.handleStageFailed(stageId, reason, exception)
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

View File

@ -88,6 +88,10 @@ private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossR
private[scheduler] case class WorkerRemoved(workerId: String, host: String, message: String)
extends DAGSchedulerEvent
private[scheduler]
case class StageFailed(stageId: Int, reason: String, exception: Option[Throwable])
extends DAGSchedulerEvent
private[scheduler]
case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable])
extends DAGSchedulerEvent

View File

@ -44,7 +44,10 @@ private case class AskPermissionToCommitOutput(
* This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests)
* for an extensive design discussion.
*/
private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) extends Logging {
private[spark] class OutputCommitCoordinator(
conf: SparkConf,
isDriver: Boolean,
sc: Option[SparkContext] = None) extends Logging {
// Initialized by SparkEnv
var coordinatorRef: Option[RpcEndpointRef] = None
@ -155,9 +158,9 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
val taskId = TaskIdentifier(stageAttempt, attemptNumber)
stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId
if (stageState.authorizedCommitters(partition) == taskId) {
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
stageState.authorizedCommitters(partition) = null
sc.foreach(_.dagScheduler.stageFailed(stage, s"Authorized committer " +
s"(attemptNumber=$attemptNumber, stage=$stage, partition=$partition) failed; " +
s"but task commit success, data duplication may happen."))
}
}
}

View File

@ -19,9 +19,8 @@ package org.apache.spark.scheduler
import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.time.{Seconds, Span}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TaskContext}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite, TaskContext}
/**
* Integration tests for the OutputCommitCoordinator.
@ -45,13 +44,14 @@ class OutputCommitCoordinatorIntegrationSuite
sc = new SparkContext("local[2, 4]", "test", conf)
}
test("exception thrown in OutputCommitter.commitTask()") {
test("SPARK-39195: exception thrown in OutputCommitter.commitTask()") {
// Regression test for SPARK-10381
failAfter(Span(60, Seconds)) {
val e = intercept[SparkException] {
withTempDir { tempDir =>
sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
}
}
}.getCause.getMessage
assert(e.endsWith("failed; but task commit success, data duplication may happen."))
}
}

View File

@ -86,11 +86,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
outputCommitCoordinator = spy(new OutputCommitCoordinator(conf, isDriver = true))
outputCommitCoordinator =
spy(new OutputCommitCoordinator(conf, isDriver = true, Option(this)))
// Use Mockito.spy() to maintain the default infrastructure everywhere else.
// This mocking allows us to control the coordinator responses in test cases.
SparkEnv.createDriverEnv(conf, isLocal, listenerBus,
SparkContext.numDriverCores(master), Some(outputCommitCoordinator))
SparkContext.numDriverCores(master), this, Some(outputCommitCoordinator))
}
}
// Use Mockito.spy() to maintain the default infrastructure everywhere else
@ -187,12 +188,9 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
// The authorized committer now fails, clearing the lock
outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
attemptNumber = authorizedCommitter, reason = TaskKilled("test"))
// A new task should now be allowed to become the authorized committer
assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
nonAuthorizedCommitter + 2))
// There can only be one authorized committer
// A new task should not be allowed to become stage failed because of potential data duplication
assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
nonAuthorizedCommitter + 3))
nonAuthorizedCommitter + 2))
}
test("SPARK-19631: Do not allow failed attempts to be authorized for committing") {
@ -226,7 +224,8 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt))
// Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt cannot commit,
// then fail the 1st attempt and make sure the 4th one can commit again.
// then fail the 1st attempt and since stage failed because of potential data duplication,
// make sure fail the 4th attempt.
stage += 1
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt))
@ -235,7 +234,9 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(!outputCommitCoordinator.canCommit(stage, 3, partition, taskAttempt))
outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
ExecutorLostFailure("0", exitCausedByApp = true, None))
assert(outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))
// A new task should not be allowed to become the authorized committer since stage failed
// because of potential data duplication
assert(!outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))
}
test("SPARK-24589: Make sure stage state is cleaned up") {