[SPARK-26152] Synchronize Worker Cleanup with Worker Shutdown

## What changes were proposed in this pull request?

The race between org.apache.spark.deploy.DeployMessages.WorkDirCleanup event and  org.apache.spark.deploy.worker.Worker#onStop. Here its possible that while the WorkDirCleanup event is being processed, org.apache.spark.deploy.worker.Worker#cleanupThreadExecutor was shutdown. hence any submission after ThreadPoolExecutor will result in java.util.concurrent.RejectedExecutionException

## How was this patch tested?

Manually

Closes #24056 from ajithme/workercleanup.

Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
Ajith 2019-03-14 09:16:29 -05:00 committed by Sean Owen
parent bacffb8810
commit 2a04de52dd

View file

@ -450,27 +450,32 @@ private[deploy] class Worker(
// rpcEndpoint.
// Copy ids so that it can be used in the cleanup thread.
val appIds = (executors.values.map(_.appId) ++ drivers.values.map(_.driverId)).toSet
val cleanupFuture = concurrent.Future {
val appDirs = workDir.listFiles()
if (appDirs == null) {
throw new IOException("ERROR: Failed to list files in " + appDirs)
}
appDirs.filter { dir =>
// the directory is used by an application - check that the application is not running
// when cleaning up
val appIdFromDir = dir.getName
val isAppStillRunning = appIds.contains(appIdFromDir)
dir.isDirectory && !isAppStillRunning &&
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)
}.foreach { dir =>
logInfo(s"Removing directory: ${dir.getPath}")
Utils.deleteRecursively(dir)
}
}(cleanupThreadExecutor)
try {
val cleanupFuture: concurrent.Future[Unit] = concurrent.Future {
val appDirs = workDir.listFiles()
if (appDirs == null) {
throw new IOException("ERROR: Failed to list files in " + appDirs)
}
appDirs.filter { dir =>
// the directory is used by an application - check that the application is not running
// when cleaning up
val appIdFromDir = dir.getName
val isAppStillRunning = appIds.contains(appIdFromDir)
dir.isDirectory && !isAppStillRunning &&
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)
}.foreach { dir =>
logInfo(s"Removing directory: ${dir.getPath}")
Utils.deleteRecursively(dir)
}
}(cleanupThreadExecutor)
cleanupFuture.failed.foreach(e =>
logError("App dir cleanup failed: " + e.getMessage, e)
)(cleanupThreadExecutor)
cleanupFuture.failed.foreach(e =>
logError("App dir cleanup failed: " + e.getMessage, e)
)(cleanupThreadExecutor)
} catch {
case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown =>
logWarning("Failed to cleanup work dir as executor pool was shutdown")
}
case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
@ -634,15 +639,20 @@ private[deploy] class Worker(
val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id)
if (shouldCleanup) {
finishedApps -= id
appDirectories.remove(id).foreach { dirList =>
concurrent.Future {
logInfo(s"Cleaning up local directories for application $id")
dirList.foreach { dir =>
Utils.deleteRecursively(new File(dir))
}
}(cleanupThreadExecutor).failed.foreach(e =>
logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
)(cleanupThreadExecutor)
try {
appDirectories.remove(id).foreach { dirList =>
concurrent.Future {
logInfo(s"Cleaning up local directories for application $id")
dirList.foreach { dir =>
Utils.deleteRecursively(new File(dir))
}
}(cleanupThreadExecutor).failed.foreach(e =>
logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
)(cleanupThreadExecutor)
}
} catch {
case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown =>
logWarning("Failed to cleanup application as executor pool was shutdown")
}
shuffleService.applicationRemoved(id)
}