[SPARK-12365][CORE] Use ShutdownHookManager where Runtime.getRuntime.addShutdownHook() is called
SPARK-9886 fixed ExternalBlockStore.scala This PR fixes the remaining references to Runtime.getRuntime.addShutdownHook() Author: tedyu <yuzhihong@gmail.com> Closes #10325 from ted-yu/master.
This commit is contained in:
parent
38d9795a4f
commit
f590178d7a
|
@ -28,7 +28,7 @@ import org.apache.spark.network.sasl.SaslServerBootstrap
|
|||
import org.apache.spark.network.server.{TransportServerBootstrap, TransportServer}
|
||||
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
|
||||
import org.apache.spark.network.util.TransportConf
|
||||
import org.apache.spark.util.Utils
|
||||
import org.apache.spark.util.{ShutdownHookManager, Utils}
|
||||
|
||||
/**
|
||||
* Provides a server from which Executors can read shuffle files (rather than reading directly from
|
||||
|
@ -118,19 +118,13 @@ object ExternalShuffleService extends Logging {
|
|||
server = newShuffleService(sparkConf, securityManager)
|
||||
server.start()
|
||||
|
||||
installShutdownHook()
|
||||
ShutdownHookManager.addShutdownHook { () =>
|
||||
logInfo("Shutting down shuffle service.")
|
||||
server.stop()
|
||||
barrier.countDown()
|
||||
}
|
||||
|
||||
// keep running until the process is terminated
|
||||
barrier.await()
|
||||
}
|
||||
|
||||
private def installShutdownHook(): Unit = {
|
||||
Runtime.getRuntime.addShutdownHook(new Thread("External Shuffle Service shutdown thread") {
|
||||
override def run() {
|
||||
logInfo("Shutting down shuffle service.")
|
||||
server.stop()
|
||||
barrier.countDown()
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch
|
|||
import org.apache.spark.deploy.mesos.ui.MesosClusterUI
|
||||
import org.apache.spark.deploy.rest.mesos.MesosRestServer
|
||||
import org.apache.spark.scheduler.cluster.mesos._
|
||||
import org.apache.spark.util.SignalLogger
|
||||
import org.apache.spark.util.{ShutdownHookManager, SignalLogger}
|
||||
import org.apache.spark.{Logging, SecurityManager, SparkConf}
|
||||
|
||||
/*
|
||||
|
@ -103,14 +103,11 @@ private[mesos] object MesosClusterDispatcher extends Logging {
|
|||
}
|
||||
val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
|
||||
dispatcher.start()
|
||||
val shutdownHook = new Thread() {
|
||||
override def run() {
|
||||
logInfo("Shutdown hook is shutting down dispatcher")
|
||||
dispatcher.stop()
|
||||
dispatcher.awaitShutdown()
|
||||
}
|
||||
ShutdownHookManager.addShutdownHook { () =>
|
||||
logInfo("Shutdown hook is shutting down dispatcher")
|
||||
dispatcher.stop()
|
||||
dispatcher.awaitShutdown()
|
||||
}
|
||||
Runtime.getRuntime.addShutdownHook(shutdownHook)
|
||||
dispatcher.awaitShutdown()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -162,7 +162,9 @@ private[spark] object ShutdownHookManager extends Logging {
|
|||
val hook = new Thread {
|
||||
override def run() {}
|
||||
}
|
||||
// scalastyle:off runtimeaddshutdownhook
|
||||
Runtime.getRuntime.addShutdownHook(hook)
|
||||
// scalastyle:on runtimeaddshutdownhook
|
||||
Runtime.getRuntime.removeShutdownHook(hook)
|
||||
} catch {
|
||||
case ise: IllegalStateException => return true
|
||||
|
@ -228,7 +230,9 @@ private [util] class SparkShutdownHookManager {
|
|||
.invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
|
||||
|
||||
case Failure(_) =>
|
||||
// scalastyle:off runtimeaddshutdownhook
|
||||
Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
|
||||
// scalastyle:on runtimeaddshutdownhook
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -157,6 +157,18 @@ This file is divided into 3 sections:
|
|||
]]></customMessage>
|
||||
</check>
|
||||
|
||||
<check customId="runtimeaddshutdownhook" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
|
||||
<parameters><parameter name="regex">Runtime\.getRuntime\.addShutdownHook</parameter></parameters>
|
||||
<customMessage><![CDATA[
|
||||
Are you sure that you want to use Runtime.getRuntime.addShutdownHook? In most cases, you should use
|
||||
ShutdownHookManager.addShutdownHook instead.
|
||||
If you must use Runtime.getRuntime.addShutdownHook, wrap the code block with
|
||||
// scalastyle:off runtimeaddshutdownhook
|
||||
Runtime.getRuntime.addShutdownHook(...)
|
||||
// scalastyle:on runtimeaddshutdownhook
|
||||
]]></customMessage>
|
||||
</check>
|
||||
|
||||
<check customId="classforname" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
|
||||
<parameters><parameter name="regex">Class\.forName</parameter></parameters>
|
||||
<customMessage><![CDATA[
|
||||
|
|
|
@ -195,20 +195,18 @@ private[hive] object SparkSQLCLIDriver extends Logging {
|
|||
}
|
||||
|
||||
// add shutdown hook to flush the history to history file
|
||||
Runtime.getRuntime.addShutdownHook(new Thread(new Runnable() {
|
||||
override def run() = {
|
||||
reader.getHistory match {
|
||||
case h: FileHistory =>
|
||||
try {
|
||||
h.flush()
|
||||
} catch {
|
||||
case e: IOException =>
|
||||
logWarning("WARNING: Failed to write command history file: " + e.getMessage)
|
||||
}
|
||||
case _ =>
|
||||
}
|
||||
ShutdownHookManager.addShutdownHook { () =>
|
||||
reader.getHistory match {
|
||||
case h: FileHistory =>
|
||||
try {
|
||||
h.flush()
|
||||
} catch {
|
||||
case e: IOException =>
|
||||
logWarning("WARNING: Failed to write command history file: " + e.getMessage)
|
||||
}
|
||||
case _ =>
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
// TODO: missing
|
||||
/*
|
||||
|
|
Loading…
Reference in a new issue