[SPARK-34142][CORE] Support Fallback Storage Cleanup during stopping SparkContext

### What changes were proposed in this pull request?

This PR aims to support fallback storage clean-up during stopping `SparkContext`.

### Why are the changes needed?

SPARK-33545 added `Support Fallback Storage during worker decommission` for the managed cloud-storages with TTL support.  Usually, it's one day. This PR will add an additional clean-up feature during stopping `SparkContext` in order to save some money before TTL or the other HDFS-compatible storage which doesn't have TTL support.

### Does this PR introduce _any_ user-facing change?

Yes, but this is a new feature.

### How was this patch tested?

Pass the newly added UT.

Closes #31215 from dongjoon-hyun/SPARK-34142.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Dongjoon Hyun 2021-01-17 16:54:01 -08:00 committed by Dongjoon Hyun
parent 098f2268e4
commit 415506cc04
4 changed files with 51 additions and 3 deletions

View file

@ -2096,6 +2096,7 @@ class SparkContext(config: SparkConf) extends Logging {
Utils.tryLogNonFatalError {
_plugins.foreach(_.shutdown())
}
FallbackStorage.cleanUp(_conf, _hadoopConfiguration)
Utils.tryLogNonFatalError {
_eventLogger.foreach(_.stop())
}

View file

@ -481,6 +481,13 @@ package object config {
.checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.")
.createOptional
private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP =
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp")
.doc("If true, Spark cleans up its fallback storage data during shutting down.")
.version("3.2.0")
.booleanConf
.createWithDefault(false)
private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE =
ConfigBuilder("spark.storage.replication.topologyFile")
.version("2.1.0")

View file

@ -23,12 +23,13 @@ import java.nio.ByteBuffer
import scala.concurrent.Future
import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
@ -119,6 +120,27 @@ object FallbackStorage extends Logging {
}
}
/** Clean up the generated fallback location for this app. */
def cleanUp(conf: SparkConf, hadoopConf: Configuration): Unit = {
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined &&
conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP) &&
conf.contains("spark.app.id")) {
val fallbackPath =
new Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get, conf.getAppId)
val fallbackUri = fallbackPath.toUri
val fallbackFileSystem = FileSystem.get(fallbackUri, hadoopConf)
// The fallback directory for this app may not be created yet.
if (fallbackFileSystem.exists(fallbackPath)) {
if (fallbackFileSystem.delete(fallbackPath, true)) {
logInfo(s"Succeed to clean up: $fallbackUri")
} else {
// Clean-up can fail due to the permission issues.
logWarning(s"Failed to clean up: $fallbackUri")
}
}
}
}
/** Report block status to block manager master and map output tracker master. */
private def reportBlockStatus(blockManager: BlockManager, blockId: BlockId, dataLength: Long) = {
assert(blockManager.master != null)
@ -171,4 +193,3 @@ object FallbackStorage extends Logging {
}
}
}

View file

@ -16,11 +16,12 @@
*/
package org.apache.spark.storage
import java.io.{DataOutputStream, FileOutputStream, IOException}
import java.io.{DataOutputStream, File, FileOutputStream, IOException}
import java.nio.file.Files
import scala.concurrent.duration._
import org.apache.hadoop.conf.Configuration
import org.mockito.{ArgumentMatchers => mc}
import org.mockito.Mockito.{mock, times, verify, when}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
@ -106,6 +107,24 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
}
test("SPARK-34142: fallback storage API - cleanUp") {
withTempDir { dir =>
Seq(true, false).foreach { cleanUp =>
val appId = s"test$cleanUp"
val conf = new SparkConf(false)
.set("spark.app.id", appId)
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, dir.getAbsolutePath + "/")
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, cleanUp)
val location = new File(dir, appId)
assert(location.mkdir())
assert(location.exists())
FallbackStorage.cleanUp(conf, new Configuration())
assert(location.exists() != cleanUp)
}
}
}
test("migrate shuffle data to fallback storage") {
val conf = new SparkConf(false)
.set("spark.app.id", "testId")