diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 7c403c4ec9..f9aaa7f9cc 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -21,7 +21,8 @@ import java.util.Date import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ -import scala.collection.mutable.HashMap +import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection.mutable import org.apache.spark._ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} @@ -1253,12 +1254,47 @@ private[spark] class AppStatusListener( toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) } } - private def cleanupStages(count: Long): Unit = { - val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES)) - if (countToDelete <= 0L) { - return + private case class StageCompletionTime( + stageId: Int, + attemptId: Int, + completionTime: Long) + + private def cleanupStagesWithInMemoryStore(countToDelete: Long): Seq[Array[Int]] = { + val stageArray = new ArrayBuffer[StageCompletionTime]() + val stageDataCount = new mutable.HashMap[Int, Int]() + kvstore.view(classOf[StageDataWrapper]).forEach { s => + // Here we keep track of the total number of StageDataWrapper entries for each stage id. + // This will be used in cleaning up the RDDOperationGraphWrapper data. + if (stageDataCount.contains(s.info.stageId)) { + stageDataCount(s.info.stageId) += 1 + } else { + stageDataCount(s.info.stageId) = 1 + } + if (s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING) { + val candidate = + StageCompletionTime(s.info.stageId, s.info.attemptId, s.completionTime) + stageArray.append(candidate) + } } + // As the completion time of a skipped stage is always -1, we will remove skipped stages first. + // This is safe since the job itself contains enough information to render skipped stages in the + // UI. + stageArray.sortBy(_.completionTime).take(countToDelete.toInt).map { s => + val key = Array(s.stageId, s.attemptId) + kvstore.delete(classOf[StageDataWrapper], key) + stageDataCount(s.stageId) -= 1 + // Check whether there are remaining attempts for the same stage. If there aren't, then + // also delete the RDD graph data. + if (stageDataCount(s.stageId) == 0) { + kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId) + } + cleanupCachedQuantiles(key) + key + }.toSeq + } + + private def cleanupStagesInKVStore(countToDelete: Long): Seq[Array[Int]] = { // As the completion time of a skipped stage is always -1, we will remove skipped stages first. // This is safe since the job itself contains enough information to render skipped stages in the // UI. @@ -1267,7 +1303,7 @@ private[spark] class AppStatusListener( s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING } - val stageIds = stages.map { s => + stages.map { s => val key = Array(s.info.stageId, s.info.attemptId) kvstore.delete(s.getClass(), key) @@ -1294,6 +1330,22 @@ private[spark] class AppStatusListener( cleanupCachedQuantiles(key) key } + } + + private def cleanupStages(count: Long): Unit = { + val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES)) + if (countToDelete <= 0L) { + return + } + + // SPARK-36827: For better performance and avoiding OOM, here we use a optimized method for + // cleaning the StageDataWrapper and RDDOperationGraphWrapper data if Spark is + // using InMemoryStore. + val stageIds = if (kvstore.usingInMemoryStore) { + cleanupStagesWithInMemoryStore(countToDelete) + } else { + cleanupStagesInKVStore(countToDelete) + } // Delete summaries in one pass, as deleting them for each stage is slow kvstore.removeAllByIndexValues(classOf[ExecutorStageSummaryWrapper], "stage", stageIds) diff --git a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala index 1b8dc9c827..c276f4f206 100644 --- a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala +++ b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala @@ -191,6 +191,8 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten } } + def usingInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore] + private case class Trigger[T]( threshold: Long, action: Long => Unit) diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index a34b1a5564..b258bdb25f 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -95,7 +95,7 @@ private[spark] class StageDataWrapper( private def active: Boolean = info.status == StageStatus.ACTIVE @JsonIgnore @KVIndex("completionTime") - private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L) + def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L) } /** diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 424b328ce3..b2d3e0f52b 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -37,6 +37,7 @@ import org.apache.spark.status.ListenerEventsTestHelper._ import org.apache.spark.status.api.v1 import org.apache.spark.storage._ import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { private val conf = new SparkConf() @@ -50,10 +51,12 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { private var store: ElementTrackingStore = _ private var taskIdTracker = -1L + protected def createKVStore: KVStore = KVUtils.open(testDir, getClass().getName()) + before { time = 0L testDir = Utils.createTempDir() - store = new ElementTrackingStore(KVUtils.open(testDir, getClass().getName()), conf) + store = new ElementTrackingStore(createKVStore, conf) taskIdTracker = -1L } @@ -1872,3 +1875,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + +class AppStatusListenerWithInMemoryStoreSuite extends AppStatusListenerSuite { + override def createKVStore: KVStore = new InMemoryStore() +}