[SPARK-24340][CORE] Clean up non-shuffle disk block manager files following executor exits on a Standalone cluster

## What changes were proposed in this pull request?

Currently we only clean up the local directories on application removed. However, when executors die and restart repeatedly, many temp files are left untouched in the local directories, which is undesired behavior and could cause disk space used up gradually.

We can detect executor death in the Worker, and clean up the non-shuffle files (files not ended with ".index" or ".data") in the local directories, we should not touch the shuffle files since they are expected to be used by the external shuffle service.

Scope of this PR is limited to only implement the cleanup logic on a Standalone cluster, we defer to experts familiar with other cluster managers(YARN/Mesos/K8s) to determine whether it's worth to add similar support.

## How was this patch tested?

Add new test suite to cover.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #21390 from jiangxb1987/cleanupNonshuffleFiles.
This commit is contained in:
Xingbo Jiang 2018-06-01 13:46:05 -07:00 committed by Xiao Li
parent 09e78c1eaa
commit 8ef167a5f9
9 changed files with 400 additions and 20 deletions

View file

@ -17,10 +17,7 @@
package org.apache.spark.network.util;
import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
@ -91,11 +88,24 @@ public class JavaUtils {
* @throws IOException if deletion is unsuccessful
*/
public static void deleteRecursively(File file) throws IOException {
deleteRecursively(file, null);
}
/**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
*
* @param file Input file / dir to be deleted
* @param filter A filename filter that make sure only files / dirs with the satisfied filenames
* are deleted.
* @throws IOException if deletion is unsuccessful
*/
public static void deleteRecursively(File file, FilenameFilter filter) throws IOException {
if (file == null) { return; }
// On Unix systems, use operating system command to run faster
// If that does not work out, fallback to the Java IO way
if (SystemUtils.IS_OS_UNIX) {
if (SystemUtils.IS_OS_UNIX && filter == null) {
try {
deleteRecursivelyUsingUnixNative(file);
return;
@ -105,15 +115,17 @@ public class JavaUtils {
}
}
deleteRecursivelyUsingJavaIO(file);
deleteRecursivelyUsingJavaIO(file, filter);
}
private static void deleteRecursivelyUsingJavaIO(File file) throws IOException {
private static void deleteRecursivelyUsingJavaIO(
File file,
FilenameFilter filter) throws IOException {
if (file.isDirectory() && !isSymlink(file)) {
IOException savedIOException = null;
for (File child : listFilesSafely(file)) {
for (File child : listFilesSafely(file, filter)) {
try {
deleteRecursively(child);
deleteRecursively(child, filter);
} catch (IOException e) {
// In case of multiple exceptions, only last one will be thrown
savedIOException = e;
@ -124,10 +136,13 @@ public class JavaUtils {
}
}
boolean deleted = file.delete();
// Delete can also fail if the file simply did not exist.
if (!deleted && file.exists()) {
throw new IOException("Failed to delete: " + file.getAbsolutePath());
// Delete file only when it's a normal file or an empty directory.
if (file.isFile() || (file.isDirectory() && listFilesSafely(file, null).length == 0)) {
boolean deleted = file.delete();
// Delete can also fail if the file simply did not exist.
if (!deleted && file.exists()) {
throw new IOException("Failed to delete: " + file.getAbsolutePath());
}
}
}
@ -157,9 +172,9 @@ public class JavaUtils {
}
}
private static File[] listFilesSafely(File file) throws IOException {
private static File[] listFilesSafely(File file, FilenameFilter filter) throws IOException {
if (file.exists()) {
File[] files = file.listFiles();
File[] files = file.listFiles(filter);
if (files == null) {
throw new IOException("Failed to list files for dir: " + file);
}

View file

@ -138,6 +138,13 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
blockManager.applicationRemoved(appId, cleanupLocalDirs);
}
/**
* Clean up any non-shuffle files in any local directories associated with an finished executor.
*/
public void executorRemoved(String executorId, String appId) {
blockManager.executorRemoved(executorId, appId);
}
/**
* Register an (application, executor) with the given shuffle info.
*

View file

@ -211,6 +211,26 @@ public class ExternalShuffleBlockResolver {
}
}
/**
* Removes all the non-shuffle files in any local directories associated with the finished
* executor.
*/
public void executorRemoved(String executorId, String appId) {
logger.info("Clean up non-shuffle files associated with the finished executor {}", executorId);
AppExecId fullId = new AppExecId(appId, executorId);
final ExecutorShuffleInfo executor = executors.get(fullId);
if (executor == null) {
// Executor not registered, skip clean up of the local directories.
logger.info("Executor is not registered (appId={}, execId={})", appId, executorId);
} else {
logger.info("Cleaning up non-shuffle files in executor {}'s {} local dirs", fullId,
executor.localDirs.length);
// Execute the actual deletion in a different thread, as it may take some time.
directoryCleaner.execute(() -> deleteNonShuffleFiles(executor.localDirs));
}
}
/**
* Synchronously deletes each directory one at a time.
* Should be executed in its own thread, as this may take a long time.
@ -226,6 +246,29 @@ public class ExternalShuffleBlockResolver {
}
}
/**
* Synchronously deletes non-shuffle files in each directory recursively.
* Should be executed in its own thread, as this may take a long time.
*/
private void deleteNonShuffleFiles(String[] dirs) {
FilenameFilter filter = new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
// Don't delete shuffle data or shuffle index files.
return !name.endsWith(".index") && !name.endsWith(".data");
}
};
for (String localDir : dirs) {
try {
JavaUtils.deleteRecursively(new File(localDir), filter);
logger.debug("Successfully cleaned up non-shuffle files in directory: {}", localDir);
} catch (Exception e) {
logger.error("Failed to delete non-shuffle files in directory: " + localDir, e);
}
}
}
/**
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
* called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,

View file

@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.shuffle;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
public class NonShuffleFilesCleanupSuite {
// Same-thread Executor used to ensure cleanup happens synchronously in test thread.
private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
private TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
@Test
public void cleanupOnRemovedExecutorWithShuffleFiles() throws IOException {
cleanupOnRemovedExecutor(true);
}
@Test
public void cleanupOnRemovedExecutorWithoutShuffleFiles() throws IOException {
cleanupOnRemovedExecutor(false);
}
private void cleanupOnRemovedExecutor(boolean withShuffleFiles) throws IOException {
TestShuffleDataContext dataContext = initDataContext(withShuffleFiles);
ExternalShuffleBlockResolver resolver =
new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
resolver.executorRemoved("exec0", "app");
assertCleanedUp(dataContext);
}
@Test
public void cleanupUsesExecutorWithShuffleFiles() throws IOException {
cleanupUsesExecutor(true);
}
@Test
public void cleanupUsesExecutorWithoutShuffleFiles() throws IOException {
cleanupUsesExecutor(false);
}
private void cleanupUsesExecutor(boolean withShuffleFiles) throws IOException {
TestShuffleDataContext dataContext = initDataContext(withShuffleFiles);
AtomicBoolean cleanupCalled = new AtomicBoolean(false);
// Executor which does nothing to ensure we're actually using it.
Executor noThreadExecutor = runnable -> cleanupCalled.set(true);
ExternalShuffleBlockResolver manager =
new ExternalShuffleBlockResolver(conf, null, noThreadExecutor);
manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
manager.executorRemoved("exec0", "app");
assertTrue(cleanupCalled.get());
assertStillThere(dataContext);
}
@Test
public void cleanupOnlyRemovedExecutorWithShuffleFiles() throws IOException {
cleanupOnlyRemovedExecutor(true);
}
@Test
public void cleanupOnlyRemovedExecutorWithoutShuffleFiles() throws IOException {
cleanupOnlyRemovedExecutor(false);
}
private void cleanupOnlyRemovedExecutor(boolean withShuffleFiles) throws IOException {
TestShuffleDataContext dataContext0 = initDataContext(withShuffleFiles);
TestShuffleDataContext dataContext1 = initDataContext(withShuffleFiles);
ExternalShuffleBlockResolver resolver =
new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER));
resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo(SORT_MANAGER));
resolver.executorRemoved("exec-nonexistent", "app");
assertStillThere(dataContext0);
assertStillThere(dataContext1);
resolver.executorRemoved("exec0", "app");
assertCleanedUp(dataContext0);
assertStillThere(dataContext1);
resolver.executorRemoved("exec1", "app");
assertCleanedUp(dataContext0);
assertCleanedUp(dataContext1);
// Make sure it's not an error to cleanup multiple times
resolver.executorRemoved("exec1", "app");
assertCleanedUp(dataContext0);
assertCleanedUp(dataContext1);
}
@Test
public void cleanupOnlyRegisteredExecutorWithShuffleFiles() throws IOException {
cleanupOnlyRegisteredExecutor(true);
}
@Test
public void cleanupOnlyRegisteredExecutorWithoutShuffleFiles() throws IOException {
cleanupOnlyRegisteredExecutor(false);
}
private void cleanupOnlyRegisteredExecutor(boolean withShuffleFiles) throws IOException {
TestShuffleDataContext dataContext = initDataContext(withShuffleFiles);
ExternalShuffleBlockResolver resolver =
new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
resolver.executorRemoved("exec1", "app");
assertStillThere(dataContext);
resolver.executorRemoved("exec0", "app");
assertCleanedUp(dataContext);
}
private static void assertStillThere(TestShuffleDataContext dataContext) {
for (String localDir : dataContext.localDirs) {
assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists());
}
}
private static FilenameFilter filter = new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
// Don't delete shuffle data or shuffle index files.
return !name.endsWith(".index") && !name.endsWith(".data");
}
};
private static boolean assertOnlyShuffleDataInDir(File[] dirs) {
for (File dir : dirs) {
assertTrue(dir.getName() + " wasn't cleaned up", !dir.exists() ||
dir.listFiles(filter).length == 0 || assertOnlyShuffleDataInDir(dir.listFiles()));
}
return true;
}
private static void assertCleanedUp(TestShuffleDataContext dataContext) {
for (String localDir : dataContext.localDirs) {
File[] dirs = new File[] {new File(localDir)};
assertOnlyShuffleDataInDir(dirs);
}
}
private static TestShuffleDataContext initDataContext(boolean withShuffleFiles)
throws IOException {
if (withShuffleFiles) {
return initDataContextWithShuffleFiles();
} else {
return initDataContextWithoutShuffleFiles();
}
}
private static TestShuffleDataContext initDataContextWithShuffleFiles() throws IOException {
TestShuffleDataContext dataContext = createDataContext();
createShuffleFiles(dataContext);
createNonShuffleFiles(dataContext);
return dataContext;
}
private static TestShuffleDataContext initDataContextWithoutShuffleFiles() throws IOException {
TestShuffleDataContext dataContext = createDataContext();
createNonShuffleFiles(dataContext);
return dataContext;
}
private static TestShuffleDataContext createDataContext() {
TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);
dataContext.create();
return dataContext;
}
private static void createShuffleFiles(TestShuffleDataContext dataContext) throws IOException {
Random rand = new Random(123);
dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), new byte[][] {
"ABC".getBytes(StandardCharsets.UTF_8),
"DEF".getBytes(StandardCharsets.UTF_8)});
}
private static void createNonShuffleFiles(TestShuffleDataContext dataContext) throws IOException {
// Create spill file(s)
dataContext.insertSpillData();
}
}

View file

@ -22,6 +22,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.UUID;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
@ -94,6 +95,20 @@ public class TestShuffleDataContext {
}
}
/** Creates spill file(s) within the local dirs. */
public void insertSpillData() throws IOException {
String filename = "temp_local_" + UUID.randomUUID();
OutputStream dataStream = null;
try {
dataStream = new FileOutputStream(
ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, filename));
dataStream.write(42);
} finally {
Closeables.close(dataStream, false);
}
}
/**
* Creates an ExecutorShuffleInfo object based on the given shuffle manager which targets this
* context's directories.

View file

@ -94,6 +94,11 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
blockHandler.applicationRemoved(appId, true /* cleanupLocalDirs */)
}
/** Clean up all the non-shuffle files associated with an executor that has exited. */
def executorRemoved(executorId: String, appId: String): Unit = {
blockHandler.executorRemoved(executorId, appId)
}
def stop() {
if (server != null) {
server.close()

View file

@ -23,6 +23,7 @@ import java.text.SimpleDateFormat
import java.util.{Date, Locale, UUID}
import java.util.concurrent._
import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture}
import java.util.function.Supplier
import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
import scala.concurrent.ExecutionContext
@ -49,7 +50,8 @@ private[deploy] class Worker(
endpointName: String,
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager)
val securityMgr: SecurityManager,
externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null)
extends ThreadSafeRpcEndpoint with Logging {
private val host = rpcEnv.address.host
@ -97,6 +99,10 @@ private[deploy] class Worker(
private val APP_DATA_RETENTION_SECONDS =
conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
// Whether or not cleanup the non-shuffle files on executor exits.
private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
conf.getBoolean("spark.storage.cleanupFilesAfterExecutorExit", true)
private val testing: Boolean = sys.props.contains("spark.testing")
private var master: Option[RpcEndpointRef] = None
@ -142,7 +148,11 @@ private[deploy] class Worker(
WorkerWebUI.DEFAULT_RETAINED_DRIVERS)
// The shuffle service is not actually started unless configured.
private val shuffleService = new ExternalShuffleService(conf, securityMgr)
private val shuffleService = if (externalShuffleServiceSupplier != null) {
externalShuffleServiceSupplier.get()
} else {
new ExternalShuffleService(conf, securityMgr)
}
private val publicAddress = {
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
@ -732,6 +742,9 @@ private[deploy] class Worker(
trimFinishedExecutorsIfNecessary()
coresUsed -= executor.cores
memoryUsed -= executor.memory
if (CLEANUP_NON_SHUFFLE_FILES_ENABLED) {
shuffleService.executorRemoved(executorStateChanged.execId.toString, appId)
}
case None =>
logInfo("Unknown Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +

View file

@ -17,10 +17,19 @@
package org.apache.spark.deploy.worker
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Supplier
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.{BeforeAndAfter, Matchers}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.{Command, ExecutorState}
import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService}
import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
@ -29,6 +38,8 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
import org.apache.spark.deploy.DeployTestUtils._
@Mock(answer = RETURNS_SMART_NULLS) private var shuffleService: ExternalShuffleService = _
def cmd(javaOpts: String*): Command = {
Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq(javaOpts : _*))
}
@ -36,15 +47,21 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
private var _worker: Worker = _
private def makeWorker(conf: SparkConf): Worker = {
private def makeWorker(
conf: SparkConf,
shuffleServiceSupplier: Supplier[ExternalShuffleService] = null): Worker = {
assert(_worker === null, "Some Worker's RpcEnv is leaked in tests")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, securityMgr)
_worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"Worker", "/tmp", conf, securityMgr)
"Worker", "/tmp", conf, securityMgr, shuffleServiceSupplier)
_worker
}
before {
MockitoAnnotations.initMocks(this)
}
after {
if (_worker != null) {
_worker.rpcEnv.shutdown()
@ -194,4 +211,36 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
assert(worker.finishedDrivers.size === expectedValue)
}
}
test("cleanup non-shuffle files after executor exits when config " +
"spark.storage.cleanupFilesAfterExecutorExit=true") {
testCleanupFilesWithConfig(true)
}
test("don't cleanup non-shuffle files after executor exits when config " +
"spark.storage.cleanupFilesAfterExecutorExit=false") {
testCleanupFilesWithConfig(false)
}
private def testCleanupFilesWithConfig(value: Boolean) = {
val conf = new SparkConf().set("spark.storage.cleanupFilesAfterExecutorExit", value.toString)
val cleanupCalled = new AtomicBoolean(false)
when(shuffleService.executorRemoved(any[String], any[String])).thenAnswer(new Answer[Unit] {
override def answer(invocations: InvocationOnMock): Unit = {
cleanupCalled.set(true)
}
})
val externalShuffleServiceSupplier = new Supplier[ExternalShuffleService] {
override def get: ExternalShuffleService = shuffleService
}
val worker = makeWorker(conf, externalShuffleServiceSupplier)
// initialize workers
for (i <- 0 until 10) {
worker.executors += s"app1/$i" -> createExecutorRunner(i)
}
worker.handleExecutorStateChanged(
ExecutorStateChanged("app1", 0, ExecutorState.EXITED, None, None))
assert(cleanupCalled.get() == value)
}
}

View file

@ -254,6 +254,18 @@ SPARK_WORKER_OPTS supports the following system properties:
especially if you run jobs very frequently.
</td>
</tr>
<tr>
<td><code>spark.storage.cleanupFilesAfterExecutorExit</code></td>
<td>true</td>
<td>
Enable cleanup non-shuffle files(such as temp. shuffle blocks, cached RDD/broadcast blocks,
spill files, etc) of worker directories following executor exits. Note that this doesn't
overlap with `spark.worker.cleanup.enabled`, as this enables cleanup of non-shuffle files in
local directories of a dead executor, while `spark.worker.cleanup.enabled` enables cleanup of
all files/subdirectories of a stopped and timeout application.
This only affects Standalone mode, support of other cluster manangers can be added in the future.
</td>
</tr>
<tr>
<td><code>spark.worker.ui.compressedLogFileLengthCacheSize</code></td>
<td>100</td>