[SPARK-31208][CORE] Add an expiremental cleanShuffleDependencies

### What changes were proposed in this pull request?

Add a cleanShuffleDependencies as an experimental developer feature to allow folks to clean up shuffle files more aggressively than we currently do.

### Why are the changes needed?

Dynamic scaling on Kubernetes (introduced in Spark 3) depends on only shutting down executors without shuffle files. However Spark does not aggressively clean up shuffle files (see SPARK-5836) and instead depends on JVM GC on the driver to trigger deletes. We already have a mechanism to explicitly clean up shuffle files from the ALS algorithm where we create a lot of quickly orphaned shuffle files. We should expose this as an advanced developer feature to enable people to better clean-up shuffle files improving dynamic scaling of their jobs on Kubernetes.

### Does this PR introduce any user-facing change?

This adds a new experimental API.

### How was this patch tested?

ALS already used a mechanism like this, re-targets the ALS code to the new interface, tested with existing ALS tests.

Closes #28038 from holdenk/SPARK-31208-allow-users-to-cleanup-shuffle-files.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Holden Karau 2020-04-07 13:54:36 -07:00 committed by Dongjoon Hyun
parent a28ed86a38
commit 8f010bd0a8
No known key found for this signature in database
GPG key ID: EDA00CE834F0FC5C
5 changed files with 158 additions and 71 deletions

View file

@ -222,11 +222,15 @@ private[spark] class ContextCleaner(
/** Perform shuffle cleanup. */
def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
try {
logDebug("Cleaning shuffle " + shuffleId)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
shuffleDriverComponents.removeShuffle(shuffleId, blocking)
listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
logDebug("Cleaned shuffle " + shuffleId)
if (mapOutputTrackerMaster.containsShuffle(shuffleId)) {
logDebug("Cleaning shuffle " + shuffleId)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
shuffleDriverComponents.removeShuffle(shuffleId, blocking)
listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
logDebug("Cleaned shuffle " + shuffleId)
} else {
logDebug("Asked to cleanup non-existent shuffle (maybe it was already removed)")
}
} catch {
case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
}

View file

@ -23,6 +23,7 @@ import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
import scala.io.Codec
import scala.language.implicitConversions
import scala.ref.WeakReference
import scala.reflect.{classTag, ClassTag}
import scala.util.hashing
@ -243,6 +244,9 @@ abstract class RDD[T: ClassTag](
// Our dependencies and partitions will be gotten by calling subclass's methods below, and will
// be overwritten when we're checkpointed
@volatile private var dependencies_ : Seq[Dependency[_]] = _
// When we overwrite the dependencies we keep a weak reference to the old dependencies
// for user controlled cleanup.
@volatile @transient private var legacyDependencies: WeakReference[Seq[Dependency[_]]] = _
@volatile @transient private var partitions_ : Array[Partition] = _
/** An Option holding our checkpoint RDD, if we are checkpointed */
@ -265,6 +269,25 @@ abstract class RDD[T: ClassTag](
}
}
/**
* Get the list of dependencies of this RDD ignoring checkpointing.
*/
final private def internalDependencies: Option[Seq[Dependency[_]]] = {
if (legacyDependencies != null) {
legacyDependencies.get
} else if (dependencies_ != null) {
Some(dependencies_)
} else {
// This case should be infrequent.
stateLock.synchronized {
if (dependencies_ == null) {
dependencies_ = getDependencies
}
Some(dependencies_)
}
}
}
/**
* Get the array of partitions of this RDD, taking into account whether the
* RDD is checkpointed or not.
@ -1701,6 +1724,40 @@ abstract class RDD[T: ClassTag](
}
}
/**
* :: Experimental ::
* Removes an RDD's shuffles and it's non-persisted ancestors.
* When running without a shuffle service, cleaning up shuffle files enables downscaling.
* If you use the RDD after this call, you should checkpoint and materialize it first.
* If you are uncertain of what you are doing, please do not use this feature.
* Additional techniques for mitigating orphaned shuffle files:
* * Tuning the driver GC to be more aggressive, so the regular context cleaner is triggered
* * Setting an appropriate TTL for shuffle files to be auto cleaned
*/
@Experimental
@DeveloperApi
@Since("3.1.0")
def cleanShuffleDependencies(blocking: Boolean = false): Unit = {
sc.cleaner.foreach { cleaner =>
/**
* Clean the shuffles & all of its parents.
*/
def cleanEagerly(dep: Dependency[_]): Unit = {
if (dep.isInstanceOf[ShuffleDependency[_, _, _]]) {
val shuffleId = dep.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
cleaner.doCleanupShuffle(shuffleId, blocking)
}
val rdd = dep.rdd
val rddDepsOpt = rdd.internalDependencies
if (rdd.getStorageLevel == StorageLevel.NONE) {
rddDepsOpt.foreach(deps => deps.foreach(cleanEagerly))
}
}
internalDependencies.foreach(deps => deps.foreach(cleanEagerly))
}
}
/**
* :: Experimental ::
* Marks the current stage as a barrier stage, where Spark must launch all tasks together.
@ -1840,6 +1897,7 @@ abstract class RDD[T: ClassTag](
* created from the checkpoint file, and forget its old dependencies and partitions.
*/
private[spark] def markCheckpointed(): Unit = stateLock.synchronized {
legacyDependencies = new WeakReference(dependencies_)
clearDependencies()
partitions_ = null
deps = null // Forget the constructor argument for dependencies too

View file

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.rdd
import java.io.File
import scala.collection.JavaConverters._
import org.apache.commons.io.FileUtils
import org.apache.commons.io.filefilter.TrueFileFilter
import org.scalatest.BeforeAndAfterEach
import org.apache.spark._
import org.apache.spark.util.Utils
class RDDCleanerSuite extends SparkFunSuite with BeforeAndAfterEach {
override def beforeEach(): Unit = {
super.beforeEach()
// Once `Utils.getOrCreateLocalRootDirs` is called, it is cached in `Utils.localRootDirs`.
// Unless this is manually cleared before and after a test, it returns the same directory
// set before even if 'spark.local.dir' is configured afterwards.
Utils.clearLocalRootDirs()
}
override def afterEach(): Unit = {
Utils.clearLocalRootDirs()
super.afterEach()
}
test("RDD shuffle cleanup standalone") {
val conf = new SparkConf()
val localDir = Utils.createTempDir()
val checkpointDir = Utils.createTempDir()
def getAllFiles: Set[File] =
FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet
try {
conf.set("spark.local.dir", localDir.getAbsolutePath)
val sc = new SparkContext("local[2]", "test", conf)
try {
sc.setCheckpointDir(checkpointDir.getAbsolutePath)
// Test checkpoint and clean parents
val input = sc.parallelize(1 to 1000)
val keyed = input.map(x => (x % 20, 1))
val shuffled = keyed.reduceByKey(_ + _)
val keysOnly = shuffled.keys
keysOnly.count()
assert(getAllFiles.size > 0)
keysOnly.cleanShuffleDependencies(true)
val resultingFiles = getAllFiles
assert(resultingFiles === Set())
// Ensure running count again works fine even if we kill the shuffle files.
assert(keysOnly.count() === 20)
} finally {
sc.stop()
}
} finally {
Utils.deleteRecursively(localDir)
Utils.deleteRecursively(checkpointDir)
}
}
}

View file

@ -1003,7 +1003,6 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
previousItemFactors.unpersist()
itemFactors.setName(s"itemFactors-$iter").persist(intermediateRDDStorageLevel)
// TODO: Generalize PeriodicGraphCheckpointer and use it here.
val deps = itemFactors.dependencies
if (shouldCheckpoint(iter)) {
itemFactors.checkpoint() // itemFactors gets materialized in computeFactors
}
@ -1011,7 +1010,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
userFactors = computeFactors(itemFactors, itemOutBlocks, userInBlocks, rank, regParam,
itemLocalIndexEncoder, implicitPrefs, alpha, solver)
if (shouldCheckpoint(iter)) {
ALS.cleanShuffleDependencies(sc, deps)
itemFactors.cleanShuffleDependencies()
deletePreviousCheckpointFile()
previousCheckpointFile = itemFactors.getCheckpointFile
}
@ -1024,10 +1023,9 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
userLocalIndexEncoder, solver = solver)
if (shouldCheckpoint(iter)) {
itemFactors.setName(s"itemFactors-$iter").persist(intermediateRDDStorageLevel)
val deps = itemFactors.dependencies
itemFactors.checkpoint()
itemFactors.count() // checkpoint item factors and cut lineage
ALS.cleanShuffleDependencies(sc, deps)
itemFactors.cleanShuffleDependencies()
deletePreviousCheckpointFile()
previousCachedItemFactors.foreach(_.unpersist())
@ -1815,31 +1813,4 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
* satisfies this requirement, we simply use a type alias here.
*/
private[recommendation] type ALSPartitioner = org.apache.spark.HashPartitioner
/**
* Private function to clean up all of the shuffles files from the dependencies and their parents.
*/
private[spark] def cleanShuffleDependencies[T](
sc: SparkContext,
deps: Seq[Dependency[_]],
blocking: Boolean = false): Unit = {
// If there is no reference tracking we skip clean up.
sc.cleaner.foreach { cleaner =>
/**
* Clean the shuffles & all of its parents.
*/
def cleanEagerly(dep: Dependency[_]): Unit = {
if (dep.isInstanceOf[ShuffleDependency[_, _, _]]) {
val shuffleId = dep.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
cleaner.doCleanupShuffle(shuffleId, blocking)
}
val rdd = dep.rdd
val rddDeps = rdd.dependencies
if (rdd.getStorageLevel == StorageLevel.NONE && rddDeps != null) {
rddDeps.foreach(cleanEagerly)
}
}
deps.foreach(cleanEagerly)
}
}
}

View file

@ -984,49 +984,27 @@ class ALSCleanerSuite extends SparkFunSuite with BeforeAndAfterEach {
super.afterEach()
}
test("ALS shuffle cleanup standalone") {
val conf = new SparkConf()
val localDir = Utils.createTempDir()
val checkpointDir = Utils.createTempDir()
def getAllFiles: Set[File] =
FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet
try {
conf.set("spark.local.dir", localDir.getAbsolutePath)
val sc = new SparkContext("local[2]", "test", conf)
try {
sc.setCheckpointDir(checkpointDir.getAbsolutePath)
// Test checkpoint and clean parents
val input = sc.parallelize(1 to 1000)
val keyed = input.map(x => (x % 20, 1))
val shuffled = keyed.reduceByKey(_ + _)
val keysOnly = shuffled.keys
val deps = keysOnly.dependencies
keysOnly.count()
ALS.cleanShuffleDependencies(sc, deps, true)
val resultingFiles = getAllFiles
assert(resultingFiles === Set())
// Ensure running count again works fine even if we kill the shuffle files.
keysOnly.count()
} finally {
sc.stop()
}
} finally {
Utils.deleteRecursively(localDir)
Utils.deleteRecursively(checkpointDir)
}
}
test("ALS shuffle cleanup in algorithm") {
val conf = new SparkConf()
val localDir = Utils.createTempDir()
val checkpointDir = Utils.createTempDir()
def getAllFiles: Set[File] =
FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet
def getAllFiles: Set[File] = {
val files = FileUtils.listFiles(
localDir,
TrueFileFilter.INSTANCE,
TrueFileFilter.INSTANCE).asScala.toSet
files
}
try {
conf.set("spark.local.dir", localDir.getAbsolutePath)
val sc = new SparkContext("local[2]", "ALSCleanerSuite", conf)
val pattern = "shuffle_(\\d+)_.+\\.data".r
try {
sc.setCheckpointDir(checkpointDir.getAbsolutePath)
// There should be 0 shuffle files at the start
val initialIds = getAllFiles.flatMap { f =>
pattern.findAllIn(f.getName()).matchData.map { _.group(1) } }
assert(initialIds.size === 0)
// Generate test data
val (training, _) = ALSSuite.genImplicitTestData(sc, 20, 5, 1, 0.2, 0)
// Implicitly test the cleaning of parents during ALS training
@ -1044,7 +1022,6 @@ class ALSCleanerSuite extends SparkFunSuite with BeforeAndAfterEach {
val resultingFiles = getAllFiles
// We expect the last shuffles files, block ratings, user factors, and item factors to be
// around but no more.
val pattern = "shuffle_(\\d+)_.+\\.data".r
val rddIds = resultingFiles.flatMap { f =>
pattern.findAllIn(f.getName()).matchData.map { _.group(1) } }
assert(rddIds.size === 4)