[SPARK-16416][CORE] force eager creation of loggers to avoid shutdown hook conflicts

## What changes were proposed in this pull request?

Force eager creation of loggers to avoid shutdown hook conflicts.

## How was this patch tested?

Manually tested with a project using Log4j 2, verified that the shutdown hook conflict issue was solved.

Author: Mikael Ståldal <mikael.staldal@magine.com>

Closes #14320 from mikaelstaldal/shutdown-hook-logging.
This commit is contained in:
Mikael Ståldal 2016-07-24 11:16:24 +01:00 committed by Sean Owen
parent 37bed97de5
commit 23e047f460
8 changed files with 10 additions and 0 deletions

View file

@ -46,6 +46,8 @@ private[spark] class MapOutputTrackerMasterEndpoint(
override val rpcEnv: RpcEnv, tracker: MapOutputTrackerMaster, conf: SparkConf)
extends RpcEndpoint with Logging {
logDebug("init") // force eager creation of logger
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case GetMapOutputStatuses(shuffleId: Int) =>
val hostPort = context.senderAddress.hostPort

View file

@ -556,6 +556,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
// is killed, though.
logDebug("Adding shutdown hook") // force eager creation of logger
_shutdownHookRef = ShutdownHookManager.addShutdownHook(
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")

View file

@ -128,6 +128,7 @@ object ExternalShuffleService extends Logging {
server = newShuffleService(sparkConf, securityManager)
server.start()
logDebug("Adding shutdown hook") // force eager creation of logger
ShutdownHookManager.addShutdownHook { () =>
logInfo("Shutting down shuffle service.")
server.stop()

View file

@ -104,6 +104,7 @@ private[mesos] object MesosClusterDispatcher extends Logging {
}
val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
dispatcher.start()
logDebug("Adding shutdown hook") // force eager creation of logger
ShutdownHookManager.addShutdownHook { () =>
logInfo("Shutdown hook is shutting down dispatcher")
dispatcher.stop()

View file

@ -184,6 +184,8 @@ private[spark] object OutputCommitCoordinator {
override val rpcEnv: RpcEnv, outputCommitCoordinator: OutputCommitCoordinator)
extends RpcEndpoint with Logging {
logDebug("init") // force eager creation of logger
override def receive: PartialFunction[Any, Unit] = {
case StopCoordinator =>
logInfo("OutputCommitCoordinator stopped!")

View file

@ -141,6 +141,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
}
private def addShutdownHook(): AnyRef = {
logDebug("Adding shutdown hook") // force eager creation of logger
ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
logInfo("Shutdown hook called")
DiskBlockManager.this.doStop()

View file

@ -54,6 +54,7 @@ private[spark] object ShutdownHookManager extends Logging {
private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
// Add a shutdown hook to delete the temp dirs when the JVM exits
logDebug("Adding shutdown hook") // force eager creation of logger
addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
logInfo("Shutdown hook called")
// we need to materialize the paths to delete because deleteRecursively removes items from

View file

@ -592,6 +592,7 @@ class StreamingContext private[streaming] (
}
StreamingContext.setActiveContext(this)
}
logDebug("Adding shutdown hook") // force eager creation of logger
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
// Registering Streaming Metrics at the start of the StreamingContext