Style fixes as per Reynold's review
This commit is contained in:
parent
8c81068e16
commit
0c71ffe924
|
@ -361,8 +361,8 @@ class DAGScheduler(
|
||||||
stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId
|
stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId
|
||||||
jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id
|
jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id
|
||||||
val parents = getParentStages(s.rdd, jobId)
|
val parents = getParentStages(s.rdd, jobId)
|
||||||
val parentsWithoutThisJobId = parents.filter(
|
val parentsWithoutThisJobId = parents.filter(p =>
|
||||||
p => !stageIdToJobIds.get(p.id).exists(_.contains(jobId)))
|
!stageIdToJobIds.get(p.id).exists(_.contains(jobId)))
|
||||||
updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
|
updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -395,8 +395,8 @@ class DAGScheduler(
|
||||||
running -= s
|
running -= s
|
||||||
}
|
}
|
||||||
stageToInfos -= s
|
stageToInfos -= s
|
||||||
shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(
|
shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId =>
|
||||||
shuffleToMapStage.remove)
|
shuffleToMapStage.remove(shuffleId))
|
||||||
if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
|
if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
|
||||||
logDebug("Removing pending status for stage %d".format(stageId))
|
logDebug("Removing pending status for stage %d".format(stageId))
|
||||||
}
|
}
|
||||||
|
@ -573,8 +573,8 @@ class DAGScheduler(
|
||||||
case JobGroupCancelled(groupId) =>
|
case JobGroupCancelled(groupId) =>
|
||||||
// Cancel all jobs belonging to this job group.
|
// Cancel all jobs belonging to this job group.
|
||||||
// First finds all active jobs with this group id, and then kill stages for them.
|
// First finds all active jobs with this group id, and then kill stages for them.
|
||||||
val activeInGroup = activeJobs.filter(
|
val activeInGroup = activeJobs.filter(activeJob =>
|
||||||
groupId == _.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
|
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
|
||||||
val jobIds = activeInGroup.map(_.jobId)
|
val jobIds = activeInGroup.map(_.jobId)
|
||||||
jobIds.foreach { handleJobCancellation }
|
jobIds.foreach { handleJobCancellation }
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue