From 7ac0a2c37b40066ad9b790dd7f070e23dba886e8 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 24 Sep 2021 17:24:18 +0800 Subject: [PATCH] [SPARK-36827][CORE] Improve the perf and memory usage of cleaning up stage UI data ### What changes were proposed in this pull request? Improve the perf and memory usage of cleaning up stage UI data. The new code make copy of the essential fields(stage id, attempt id, completion time) to an array and determine which stage data and `RDDOperationGraphWrapper` needs to be clean based on it ### Why are the changes needed? Fix the memory usage issue described in https://issues.apache.org/jira/browse/SPARK-36827 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new unit test for the InMemoryStore. Also, run a simple benchmark with ``` val testConf = conf.clone() .set(MAX_RETAINED_STAGES, 1000) val listener = new AppStatusListener(store, testConf, true) val stages = (1 to 5000).map { i => val s = new StageInfo(i, 0, s"stage$i", 4, Nil, Nil, "details1", resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) s.submissionTime = Some(i.toLong) s } listener.onJobStart(SparkListenerJobStart(4, time, Nil, null)) val start = System.nanoTime() stages.foreach { s => time +=1 s.submissionTime = Some(time) listener.onStageSubmitted(SparkListenerStageSubmitted(s, new Properties())) s.completionTime = Some(time) listener.onStageCompleted(SparkListenerStageCompleted(s)) } println(System.nanoTime() - start) ``` Before changes: InMemoryStore: 1.2s After changes: InMemoryStore: 0.23s Closes #34092 from gengliangwang/cleanStage. Authored-by: Gengliang Wang Signed-off-by: Gengliang Wang --- .../spark/status/AppStatusListener.scala | 64 +++++++++++++++++-- .../spark/status/ElementTrackingStore.scala | 2 + .../org/apache/spark/status/storeTypes.scala | 2 +- .../spark/status/AppStatusListenerSuite.scala | 9 ++- 4 files changed, 69 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 7c403c4ec9..f9aaa7f9cc 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -21,7 +21,8 @@ import java.util.Date import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ -import scala.collection.mutable.HashMap +import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection.mutable import org.apache.spark._ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} @@ -1253,12 +1254,47 @@ private[spark] class AppStatusListener( toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) } } - private def cleanupStages(count: Long): Unit = { - val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES)) - if (countToDelete <= 0L) { - return + private case class StageCompletionTime( + stageId: Int, + attemptId: Int, + completionTime: Long) + + private def cleanupStagesWithInMemoryStore(countToDelete: Long): Seq[Array[Int]] = { + val stageArray = new ArrayBuffer[StageCompletionTime]() + val stageDataCount = new mutable.HashMap[Int, Int]() + kvstore.view(classOf[StageDataWrapper]).forEach { s => + // Here we keep track of the total number of StageDataWrapper entries for each stage id. + // This will be used in cleaning up the RDDOperationGraphWrapper data. + if (stageDataCount.contains(s.info.stageId)) { + stageDataCount(s.info.stageId) += 1 + } else { + stageDataCount(s.info.stageId) = 1 + } + if (s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING) { + val candidate = + StageCompletionTime(s.info.stageId, s.info.attemptId, s.completionTime) + stageArray.append(candidate) + } } + // As the completion time of a skipped stage is always -1, we will remove skipped stages first. + // This is safe since the job itself contains enough information to render skipped stages in the + // UI. + stageArray.sortBy(_.completionTime).take(countToDelete.toInt).map { s => + val key = Array(s.stageId, s.attemptId) + kvstore.delete(classOf[StageDataWrapper], key) + stageDataCount(s.stageId) -= 1 + // Check whether there are remaining attempts for the same stage. If there aren't, then + // also delete the RDD graph data. + if (stageDataCount(s.stageId) == 0) { + kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId) + } + cleanupCachedQuantiles(key) + key + }.toSeq + } + + private def cleanupStagesInKVStore(countToDelete: Long): Seq[Array[Int]] = { // As the completion time of a skipped stage is always -1, we will remove skipped stages first. // This is safe since the job itself contains enough information to render skipped stages in the // UI. @@ -1267,7 +1303,7 @@ private[spark] class AppStatusListener( s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING } - val stageIds = stages.map { s => + stages.map { s => val key = Array(s.info.stageId, s.info.attemptId) kvstore.delete(s.getClass(), key) @@ -1294,6 +1330,22 @@ private[spark] class AppStatusListener( cleanupCachedQuantiles(key) key } + } + + private def cleanupStages(count: Long): Unit = { + val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES)) + if (countToDelete <= 0L) { + return + } + + // SPARK-36827: For better performance and avoiding OOM, here we use a optimized method for + // cleaning the StageDataWrapper and RDDOperationGraphWrapper data if Spark is + // using InMemoryStore. + val stageIds = if (kvstore.usingInMemoryStore) { + cleanupStagesWithInMemoryStore(countToDelete) + } else { + cleanupStagesInKVStore(countToDelete) + } // Delete summaries in one pass, as deleting them for each stage is slow kvstore.removeAllByIndexValues(classOf[ExecutorStageSummaryWrapper], "stage", stageIds) diff --git a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala index 1b8dc9c827..c276f4f206 100644 --- a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala +++ b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala @@ -191,6 +191,8 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten } } + def usingInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore] + private case class Trigger[T]( threshold: Long, action: Long => Unit) diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index a34b1a5564..b258bdb25f 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -95,7 +95,7 @@ private[spark] class StageDataWrapper( private def active: Boolean = info.status == StageStatus.ACTIVE @JsonIgnore @KVIndex("completionTime") - private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L) + def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L) } /** diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 424b328ce3..b2d3e0f52b 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -37,6 +37,7 @@ import org.apache.spark.status.ListenerEventsTestHelper._ import org.apache.spark.status.api.v1 import org.apache.spark.storage._ import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { private val conf = new SparkConf() @@ -50,10 +51,12 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { private var store: ElementTrackingStore = _ private var taskIdTracker = -1L + protected def createKVStore: KVStore = KVUtils.open(testDir, getClass().getName()) + before { time = 0L testDir = Utils.createTempDir() - store = new ElementTrackingStore(KVUtils.open(testDir, getClass().getName()), conf) + store = new ElementTrackingStore(createKVStore, conf) taskIdTracker = -1L } @@ -1872,3 +1875,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + +class AppStatusListenerWithInMemoryStoreSuite extends AppStatusListenerSuite { + override def createKVStore: KVStore = new InMemoryStore() +}