[SPARK-22952][CORE] Deprecate stageAttemptId in favour of stageAttemptNumber

## What changes were proposed in this pull request?
1.  Deprecate attemptId in StageInfo and add `def attemptNumber() = attemptId`
2. Replace usage of stageAttemptId with stageAttemptNumber

## How was this patch tested?
I manually checked the compiler warning info

Author: Xianjin YE <advancedxy@gmail.com>

Closes #20178 from advancedxy/SPARK-22952.
This commit is contained in:
Xianjin YE 2018-01-08 23:49:07 +08:00 committed by Wenchen Fan
parent eb45b52e82
commit 40b983c3b4
9 changed files with 51 additions and 41 deletions

View file

@ -815,7 +815,8 @@ class DAGScheduler(
private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
// Note that there is a chance that this task is launched after the stage is cancelled.
// In that case, we wouldn't have the stage anymore in stageIdToStage.
val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
val stageAttemptId =
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
}
@ -1050,7 +1051,7 @@ class DAGScheduler(
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
@ -1060,7 +1061,7 @@ class DAGScheduler(
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
@ -1076,7 +1077,7 @@ class DAGScheduler(
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
@ -1245,7 +1246,7 @@ class DAGScheduler(
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) {
if (stageIdToStage(task.stageId).latestInfo.attemptNumber == task.stageAttemptId) {
// This task was for the currently running attempt of the stage. Since the task
// completed successfully from the perspective of the TaskSetManager, mark it as
// no longer pending (the TaskSetManager may consider the task complete even
@ -1324,10 +1325,10 @@ class DAGScheduler(
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleIdToMapStage(shuffleId)
if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
s"(attempt ${failedStage.latestInfo.attemptNumber}) running")
} else {
// It is likely that we receive multiple FetchFailed for a single stage (because we have
// multiple tasks running concurrently on different executors). In that case, it is

View file

@ -30,7 +30,7 @@ import org.apache.spark.storage.RDDInfo
@DeveloperApi
class StageInfo(
val stageId: Int,
val attemptId: Int,
@deprecated("Use attemptNumber instead", "2.3.0") val attemptId: Int,
val name: String,
val numTasks: Int,
val rddInfos: Seq[RDDInfo],
@ -56,6 +56,8 @@ class StageInfo(
completionTime = Some(System.currentTimeMillis)
}
def attemptNumber(): Int = attemptId
private[spark] def getStatusString: String = {
if (completionTime.isDefined) {
if (failureReason.isDefined) {

View file

@ -79,7 +79,7 @@ class StatsReportListener extends SparkListener with Logging {
x => info.completionTime.getOrElse(System.currentTimeMillis()) - x
).getOrElse("-")
s"Stage(${info.stageId}, ${info.attemptId}); Name: '${info.name}'; " +
s"Stage(${info.stageId}, ${info.attemptNumber}); Name: '${info.name}'; " +
s"Status: ${info.getStatusString}$failureReason; numTasks: ${info.numTasks}; " +
s"Took: $timeTaken msec"
}

View file

@ -529,7 +529,8 @@ private[spark] class AppStatusListener(
}
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
val maybeStage = Option(liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptId)))
val maybeStage =
Option(liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber)))
maybeStage.foreach { stage =>
val now = System.nanoTime()
stage.info = event.stageInfo
@ -785,7 +786,7 @@ private[spark] class AppStatusListener(
}
private def getOrCreateStage(info: StageInfo): LiveStage = {
val stage = liveStages.computeIfAbsent((info.stageId, info.attemptId),
val stage = liveStages.computeIfAbsent((info.stageId, info.attemptNumber),
new Function[(Int, Int), LiveStage]() {
override def apply(key: (Int, Int)): LiveStage = new LiveStage()
})
@ -912,7 +913,7 @@ private[spark] class AppStatusListener(
private def cleanupTasks(stage: LiveStage): Unit = {
val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt
if (countToDelete > 0) {
val stageKey = Array(stage.info.stageId, stage.info.attemptId)
val stageKey = Array(stage.info.stageId, stage.info.attemptNumber)
val view = kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey)
.last(stageKey)

View file

@ -412,14 +412,14 @@ private class LiveStage extends LiveEntity {
def executorSummary(executorId: String): LiveExecutorStageSummary = {
executorSummaries.getOrElseUpdate(executorId,
new LiveExecutorStageSummary(info.stageId, info.attemptId, executorId))
new LiveExecutorStageSummary(info.stageId, info.attemptNumber, executorId))
}
def toApi(): v1.StageData = {
new v1.StageData(
status,
info.stageId,
info.attemptId,
info.attemptNumber,
info.numTasks,
activeTasks,

View file

@ -116,7 +116,7 @@ private[spark] object RDDOperationGraph extends Logging {
// Use a special prefix here to differentiate this cluster from other operation clusters
val stageClusterId = STAGE_CLUSTER_PREFIX + stage.stageId
val stageClusterName = s"Stage ${stage.stageId}" +
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
{ if (stage.attemptNumber == 0) "" else s" (attempt ${stage.attemptNumber})" }
val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)
var rootNodeCount = 0

View file

@ -263,7 +263,7 @@ private[spark] object JsonProtocol {
val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
("Stage ID" -> stageInfo.stageId) ~
("Stage Attempt ID" -> stageInfo.attemptId) ~
("Stage Attempt ID" -> stageInfo.attemptNumber) ~
("Stage Name" -> stageInfo.name) ~
("Number of Tasks" -> stageInfo.numTasks) ~
("RDD Info" -> rddInfo) ~

View file

@ -195,7 +195,9 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
val s1Tasks = createTasks(4, execIds)
s1Tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId, task))
listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId,
stages.head.attemptNumber,
task))
}
assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size)
@ -213,10 +215,11 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
check[TaskDataWrapper](task.taskId) { wrapper =>
assert(wrapper.info.taskId === task.taskId)
assert(wrapper.stageId === stages.head.stageId)
assert(wrapper.stageAttemptId === stages.head.attemptId)
assert(Arrays.equals(wrapper.stage, Array(stages.head.stageId, stages.head.attemptId)))
assert(wrapper.stageAttemptId === stages.head.attemptNumber)
assert(Arrays.equals(wrapper.stage, Array(stages.head.stageId, stages.head.attemptNumber)))
val runtime = Array[AnyRef](stages.head.stageId: JInteger, stages.head.attemptId: JInteger,
val runtime = Array[AnyRef](stages.head.stageId: JInteger,
stages.head.attemptNumber: JInteger,
-1L: JLong)
assert(Arrays.equals(wrapper.runtime, runtime))
@ -237,7 +240,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
Some(1L), None, true, false, None)
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(
task.executorId,
Seq((task.taskId, stages.head.stageId, stages.head.attemptId, Seq(accum)))))
Seq((task.taskId, stages.head.stageId, stages.head.attemptNumber, Seq(accum)))))
}
check[StageDataWrapper](key(stages.head)) { stage =>
@ -254,12 +257,12 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
// Fail one of the tasks, re-start it.
time += 1
s1Tasks.head.markFinished(TaskState.FAILED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId,
listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber,
"taskType", TaskResultLost, s1Tasks.head, null))
time += 1
val reattempt = newAttempt(s1Tasks.head, nextTaskId())
listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId,
listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptNumber,
reattempt))
assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size + 1)
@ -289,7 +292,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
val killed = s1Tasks.drop(1).head
killed.finishTime = time
killed.failed = true
listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId,
listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber,
"taskType", TaskKilled("killed"), killed, null))
check[JobDataWrapper](1) { job =>
@ -311,13 +314,13 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
time += 1
val denied = newAttempt(killed, nextTaskId())
val denyReason = TaskCommitDenied(1, 1, 1)
listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId,
listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptNumber,
denied))
time += 1
denied.finishTime = time
denied.failed = true
listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId,
listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber,
"taskType", denyReason, denied, null))
check[JobDataWrapper](1) { job =>
@ -337,7 +340,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
// Start a new attempt.
val reattempt2 = newAttempt(denied, nextTaskId())
listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId,
listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptNumber,
reattempt2))
// Succeed all tasks in stage 1.
@ -350,7 +353,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
time += 1
pending.foreach { task =>
task.markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId,
listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber,
"taskType", Success, task, s1Metrics))
}
@ -414,13 +417,15 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
time += 1
val s2Tasks = createTasks(4, execIds)
s2Tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId, stages.last.attemptId, task))
listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId,
stages.last.attemptNumber,
task))
}
time += 1
s2Tasks.foreach { task =>
task.markFinished(TaskState.FAILED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, stages.last.attemptId,
listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, stages.last.attemptNumber,
"taskType", TaskResultLost, task, null))
}
@ -455,7 +460,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
// - Re-submit stage 2, all tasks, and succeed them and the stage.
val oldS2 = stages.last
val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptId + 1, oldS2.name, oldS2.numTasks,
val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptNumber + 1, oldS2.name, oldS2.numTasks,
oldS2.rddInfos, oldS2.parentIds, oldS2.details, oldS2.taskMetrics)
time += 1
@ -466,14 +471,14 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
val newS2Tasks = createTasks(4, execIds)
newS2Tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, newS2.attemptId, task))
listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, newS2.attemptNumber, task))
}
time += 1
newS2Tasks.foreach { task =>
task.markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(SparkListenerTaskEnd(newS2.stageId, newS2.attemptId, "taskType", Success,
task, null))
listener.onTaskEnd(SparkListenerTaskEnd(newS2.stageId, newS2.attemptNumber, "taskType",
Success, task, null))
}
time += 1
@ -522,14 +527,15 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
val j2s2Tasks = createTasks(4, execIds)
j2s2Tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(j2Stages.last.stageId, j2Stages.last.attemptId,
listener.onTaskStart(SparkListenerTaskStart(j2Stages.last.stageId,
j2Stages.last.attemptNumber,
task))
}
time += 1
j2s2Tasks.foreach { task =>
task.markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(SparkListenerTaskEnd(j2Stages.last.stageId, j2Stages.last.attemptId,
listener.onTaskEnd(SparkListenerTaskEnd(j2Stages.last.stageId, j2Stages.last.attemptNumber,
"taskType", Success, task, null))
}
@ -919,13 +925,13 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
time += 1
val tasks = createTasks(2, Array("1"))
tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task))
listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptNumber, task))
}
assert(store.count(classOf[TaskDataWrapper]) === 2)
// Start a 3rd task. The finished tasks should be deleted.
createTasks(1, Array("1")).foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task))
listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptNumber, task))
}
assert(store.count(classOf[TaskDataWrapper]) === 2)
intercept[NoSuchElementException] {
@ -934,7 +940,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
// Start a 4th task. The first task should be deleted, even if it's still running.
createTasks(1, Array("1")).foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task))
listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptNumber, task))
}
assert(store.count(classOf[TaskDataWrapper]) === 2)
intercept[NoSuchElementException] {
@ -960,7 +966,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
}
private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptId)
private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptNumber)
private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = {
val value = store.read(classTag[T].runtimeClass, key).asInstanceOf[T]

View file

@ -99,7 +99,7 @@ class SQLAppStatusListener(
// Reset the metrics tracking object for the new attempt.
Option(stageMetrics.get(event.stageInfo.stageId)).foreach { metrics =>
metrics.taskMetrics.clear()
metrics.attemptId = event.stageInfo.attemptId
metrics.attemptId = event.stageInfo.attemptNumber
}
}