From d4e32c896a6d3c71f61f96f0b4c5d98fc8730a21 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 6 Jun 2021 09:20:42 -0700 Subject: [PATCH] [SPARK-35654][CORE] Allow ShuffleDataIO control DiskBlockManager.deleteFilesOnStop ### What changes were proposed in this pull request? This PR aims to change `DiskBlockManager` like the following to allow `ShuffleDataIO` to decide the behavior of shuffle file deletion. ```scala - private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) + private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Boolean) ``` ### Why are the changes needed? `SparkContext` creates 1. `SparkEnv` (with `BlockManager` and its `DiskBlockManager`) 2. loads `ShuffleDataIO` 3. initialize block manager. ```scala _env = createSparkEnv(_conf, isLocal, listenerBus) ... _shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver() _shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) => _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v) } ... _env.blockManager.initialize(_applicationId) ... ``` `DiskBlockManager` is created first at `BlockManager` constructor and we cannot change `deleteFilesOnStop` later at `ShuffleDataIO`. By switching to `var`, we can implement enhanced shuffle data management feature via `ShuffleDataIO` like https://github.com/apache/spark/pull/32730 . ``` val diskBlockManager = { // Only perform cleanup if an external service is not serving our shuffle files. val deleteFilesOnStop = !externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER new DiskBlockManager(conf, deleteFilesOnStop) } ``` ### Does this PR introduce _any_ user-facing change? No. This is a private class. ### How was this patch tested? N/A Closes #32784 from dongjoon-hyun/SPARK-35654. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/storage/DiskBlockManager.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 5db4965b67..f5ad4f9edb 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -32,8 +32,11 @@ import org.apache.spark.util.{ShutdownHookManager, Utils} * * Block files are hashed among the directories listed in spark.local.dir (or in * SPARK_LOCAL_DIRS, if it's set). + * + * ShuffleDataIO also can change the behavior of deleteFilesOnStop. */ -private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging { +private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Boolean) + extends Logging { private[spark] val subDirsPerLocalDir = conf.get(config.DISKSTORE_SUB_DIRECTORIES)