[HOT FIX #6125] Do not wait for all stages to start rendering
zsxwing Author: Andrew Or <andrew@databricks.com> Closes #6138 from andrewor14/dag-viz-clean-properly and squashes the following commits: 19d4e98 [Andrew Or] Add synchronize 02542d6 [Andrew Or] Rename overloaded variable d11bee1 [Andrew Or] Don't wait until all stages have started before rendering
This commit is contained in:
parent
d518c0369f
commit
2d4a961f82
|
@ -41,11 +41,11 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
|
||||||
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
|
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
|
||||||
|
|
||||||
/** Return the graph metadata for the given stage, or None if no such information exists. */
|
/** Return the graph metadata for the given stage, or None if no such information exists. */
|
||||||
def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = {
|
def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = synchronized {
|
||||||
val stageIds = jobIdToStageIds.get(jobId).getOrElse { Seq.empty }
|
val _stageIds = jobIdToStageIds.get(jobId).getOrElse { Seq.empty }
|
||||||
val graphs = stageIds.flatMap { sid => stageIdToGraph.get(sid) }
|
val graphs = _stageIds.flatMap { sid => stageIdToGraph.get(sid) }
|
||||||
// If the metadata for some stages have been removed, do not bother rendering this job
|
// If the metadata for some stages have been removed, do not bother rendering this job
|
||||||
if (stageIds.size != graphs.size) {
|
if (_stageIds.size != graphs.size) {
|
||||||
Seq.empty
|
Seq.empty
|
||||||
} else {
|
} else {
|
||||||
graphs
|
graphs
|
||||||
|
@ -53,16 +53,29 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Return the graph metadata for the given stage, or None if no such information exists. */
|
/** Return the graph metadata for the given stage, or None if no such information exists. */
|
||||||
def getOperationGraphForStage(stageId: Int): Option[RDDOperationGraph] = {
|
def getOperationGraphForStage(stageId: Int): Option[RDDOperationGraph] = synchronized {
|
||||||
stageIdToGraph.get(stageId)
|
stageIdToGraph.get(stageId)
|
||||||
}
|
}
|
||||||
|
|
||||||
/** On job start, construct a RDDOperationGraph for each stage in the job for display later. */
|
/** On job start, construct a RDDOperationGraph for each stage in the job for display later. */
|
||||||
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
|
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
|
||||||
val jobId = jobStart.jobId
|
val jobId = jobStart.jobId
|
||||||
|
val stageInfos = jobStart.stageInfos
|
||||||
|
|
||||||
jobIds += jobId
|
jobIds += jobId
|
||||||
jobIdToStageIds(jobId) = jobStart.stageInfos.map(_.stageId).sorted
|
jobIdToStageIds(jobId) = jobStart.stageInfos.map(_.stageId).sorted
|
||||||
|
|
||||||
|
stageInfos.foreach { stageInfo =>
|
||||||
|
stageIds += stageInfo.stageId
|
||||||
|
stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
|
||||||
|
// Remove state for old stages
|
||||||
|
if (stageIds.size >= retainedStages) {
|
||||||
|
val toRemove = math.max(retainedStages / 10, 1)
|
||||||
|
stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
|
||||||
|
stageIds.trimStart(toRemove)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Remove state for old jobs
|
// Remove state for old jobs
|
||||||
if (jobIds.size >= retainedJobs) {
|
if (jobIds.size >= retainedJobs) {
|
||||||
val toRemove = math.max(retainedJobs / 10, 1)
|
val toRemove = math.max(retainedJobs / 10, 1)
|
||||||
|
@ -71,15 +84,4 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Remove graph metadata for old stages */
|
|
||||||
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
|
|
||||||
val stageInfo = stageSubmitted.stageInfo
|
|
||||||
stageIds += stageInfo.stageId
|
|
||||||
stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
|
|
||||||
if (stageIds.size >= retainedStages) {
|
|
||||||
val toRemove = math.max(retainedStages / 10, 1)
|
|
||||||
stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
|
|
||||||
stageIds.trimStart(toRemove)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,6 @@ class RDDOperationGraphListenerSuite extends FunSuite {
|
||||||
assert(numStages > 0, "I will not run a job with 0 stages for you.")
|
assert(numStages > 0, "I will not run a job with 0 stages for you.")
|
||||||
val stageInfos = (0 until numStages).map { _ =>
|
val stageInfos = (0 until numStages).map { _ =>
|
||||||
val stageInfo = new StageInfo(stageIdCounter, 0, "s", 0, Seq.empty, Seq.empty, "d")
|
val stageInfo = new StageInfo(stageIdCounter, 0, "s", 0, Seq.empty, Seq.empty, "d")
|
||||||
listener.onStageSubmitted(new SparkListenerStageSubmitted(stageInfo))
|
|
||||||
stageIdCounter += 1
|
stageIdCounter += 1
|
||||||
stageInfo
|
stageInfo
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue