[SPARK-35654][CORE] Allow ShuffleDataIO control DiskBlockManager.deleteFilesOnStop

### What changes were proposed in this pull request?

This PR aims to change `DiskBlockManager` like the following to allow `ShuffleDataIO` to decide the behavior of shuffle file deletion.
```scala
- private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean)
+ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Boolean)
```

### Why are the changes needed?

`SparkContext` creates
1. `SparkEnv` (with `BlockManager` and its `DiskBlockManager`)
2. loads `ShuffleDataIO`
3. initialize block manager.
```scala
_env = createSparkEnv(_conf, isLocal, listenerBus)

...
_shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
    _shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) =>
      _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
    }
...

_env.blockManager.initialize(_applicationId)
...
```

`DiskBlockManager` is created first at `BlockManager` constructor and we cannot change `deleteFilesOnStop` later at `ShuffleDataIO`. By switching to `var`, we can implement enhanced shuffle data management feature via `ShuffleDataIO` like https://github.com/apache/spark/pull/32730 .
```
  val diskBlockManager = {
    // Only perform cleanup if an external service is not serving our shuffle files.
    val deleteFilesOnStop =
      !externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
    new DiskBlockManager(conf, deleteFilesOnStop)
  }
```

### Does this PR introduce _any_ user-facing change?

No. This is a private class.

### How was this patch tested?

N/A

Closes #32784 from dongjoon-hyun/SPARK-35654.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Dongjoon Hyun 2021-06-06 09:20:42 -07:00
parent 5e30666010
commit d4e32c896a

View file

@ -32,8 +32,11 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
* *
* Block files are hashed among the directories listed in spark.local.dir (or in * Block files are hashed among the directories listed in spark.local.dir (or in
* SPARK_LOCAL_DIRS, if it's set). * SPARK_LOCAL_DIRS, if it's set).
*
* ShuffleDataIO also can change the behavior of deleteFilesOnStop.
*/ */
private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging { private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Boolean)
extends Logging {
private[spark] val subDirsPerLocalDir = conf.get(config.DISKSTORE_SUB_DIRECTORIES) private[spark] val subDirsPerLocalDir = conf.get(config.DISKSTORE_SUB_DIRECTORIES)