[SPARK-36827][CORE] Improve the perf and memory usage of cleaning up stage UI data

### What changes were proposed in this pull request?

Improve the perf and memory usage of cleaning up stage UI data. The new code make copy of the essential fields(stage id, attempt id, completion time) to an array and determine which stage data and `RDDOperationGraphWrapper` needs to be clean based on it
### Why are the changes needed?

Fix the memory usage issue described in https://issues.apache.org/jira/browse/SPARK-36827

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Add new unit test for the InMemoryStore.
Also, run a simple benchmark with
```
    val testConf = conf.clone()
      .set(MAX_RETAINED_STAGES, 1000)

    val listener = new AppStatusListener(store, testConf, true)
    val stages = (1 to 5000).map { i =>
      val s = new StageInfo(i, 0, s"stage$i", 4, Nil, Nil, "details1",
        resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
      s.submissionTime = Some(i.toLong)
      s
    }
    listener.onJobStart(SparkListenerJobStart(4, time, Nil, null))
    val start = System.nanoTime()
    stages.foreach { s =>
      time +=1
      s.submissionTime = Some(time)
      listener.onStageSubmitted(SparkListenerStageSubmitted(s, new Properties()))
      s.completionTime = Some(time)
      listener.onStageCompleted(SparkListenerStageCompleted(s))
    }
    println(System.nanoTime() - start)
```

Before changes:
InMemoryStore: 1.2s

After changes:
InMemoryStore: 0.23s

Closes #34092 from gengliangwang/cleanStage.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
This commit is contained in:
Gengliang Wang 2021-09-24 17:24:18 +08:00
parent 52833018be
commit 7ac0a2c37b
4 changed files with 69 additions and 8 deletions

View file

@ -21,7 +21,8 @@ import java.util.Date
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.mutable
import org.apache.spark._
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
@ -1253,12 +1254,47 @@ private[spark] class AppStatusListener(
toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
}
private def cleanupStages(count: Long): Unit = {
val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES))
if (countToDelete <= 0L) {
return
private case class StageCompletionTime(
stageId: Int,
attemptId: Int,
completionTime: Long)
private def cleanupStagesWithInMemoryStore(countToDelete: Long): Seq[Array[Int]] = {
val stageArray = new ArrayBuffer[StageCompletionTime]()
val stageDataCount = new mutable.HashMap[Int, Int]()
kvstore.view(classOf[StageDataWrapper]).forEach { s =>
// Here we keep track of the total number of StageDataWrapper entries for each stage id.
// This will be used in cleaning up the RDDOperationGraphWrapper data.
if (stageDataCount.contains(s.info.stageId)) {
stageDataCount(s.info.stageId) += 1
} else {
stageDataCount(s.info.stageId) = 1
}
if (s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING) {
val candidate =
StageCompletionTime(s.info.stageId, s.info.attemptId, s.completionTime)
stageArray.append(candidate)
}
}
// As the completion time of a skipped stage is always -1, we will remove skipped stages first.
// This is safe since the job itself contains enough information to render skipped stages in the
// UI.
stageArray.sortBy(_.completionTime).take(countToDelete.toInt).map { s =>
val key = Array(s.stageId, s.attemptId)
kvstore.delete(classOf[StageDataWrapper], key)
stageDataCount(s.stageId) -= 1
// Check whether there are remaining attempts for the same stage. If there aren't, then
// also delete the RDD graph data.
if (stageDataCount(s.stageId) == 0) {
kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId)
}
cleanupCachedQuantiles(key)
key
}.toSeq
}
private def cleanupStagesInKVStore(countToDelete: Long): Seq[Array[Int]] = {
// As the completion time of a skipped stage is always -1, we will remove skipped stages first.
// This is safe since the job itself contains enough information to render skipped stages in the
// UI.
@ -1267,7 +1303,7 @@ private[spark] class AppStatusListener(
s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING
}
val stageIds = stages.map { s =>
stages.map { s =>
val key = Array(s.info.stageId, s.info.attemptId)
kvstore.delete(s.getClass(), key)
@ -1294,6 +1330,22 @@ private[spark] class AppStatusListener(
cleanupCachedQuantiles(key)
key
}
}
private def cleanupStages(count: Long): Unit = {
val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES))
if (countToDelete <= 0L) {
return
}
// SPARK-36827: For better performance and avoiding OOM, here we use a optimized method for
// cleaning the StageDataWrapper and RDDOperationGraphWrapper data if Spark is
// using InMemoryStore.
val stageIds = if (kvstore.usingInMemoryStore) {
cleanupStagesWithInMemoryStore(countToDelete)
} else {
cleanupStagesInKVStore(countToDelete)
}
// Delete summaries in one pass, as deleting them for each stage is slow
kvstore.removeAllByIndexValues(classOf[ExecutorStageSummaryWrapper], "stage", stageIds)

View file

@ -191,6 +191,8 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten
}
}
def usingInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore]
private case class Trigger[T](
threshold: Long,
action: Long => Unit)

View file

@ -95,7 +95,7 @@ private[spark] class StageDataWrapper(
private def active: Boolean = info.status == StageStatus.ACTIVE
@JsonIgnore @KVIndex("completionTime")
private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L)
def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L)
}
/**

View file

@ -37,6 +37,7 @@ import org.apache.spark.status.ListenerEventsTestHelper._
import org.apache.spark.status.api.v1
import org.apache.spark.storage._
import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
private val conf = new SparkConf()
@ -50,10 +51,12 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
private var store: ElementTrackingStore = _
private var taskIdTracker = -1L
protected def createKVStore: KVStore = KVUtils.open(testDir, getClass().getName())
before {
time = 0L
testDir = Utils.createTempDir()
store = new ElementTrackingStore(KVUtils.open(testDir, getClass().getName()), conf)
store = new ElementTrackingStore(createKVStore, conf)
taskIdTracker = -1L
}
@ -1872,3 +1875,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
}
class AppStatusListenerWithInMemoryStoreSuite extends AppStatusListenerSuite {
override def createKVStore: KVStore = new InMemoryStore()
}