[SPARK-14513][CORE] Fix threads left behind after stopping SparkContext
## What changes were proposed in this pull request? Shutting down `QueuedThreadPool` used by Jetty `Server` to avoid threads leakage after SparkContext is stopped. Note: If this fix is going to apply to the `branch-1.6`, one more patch on the `NettyRpcEnv` class is needed so that the `NettyRpcEnv._fileServer.shutdown` is called in the `NettyRpcEnv.cleanup` method. This is due to the removal of `_fileServer` field in the `NettyRpcEnv` class in the master branch. Please advice if a second PR is necessary for bring this fix back to `branch-1.6` ## How was this patch tested? Ran the ./dev/run-tests locally Author: Terence Yim <terence@cask.co> Closes #12318 from chtyim/fixes/SPARK-14513-thread-leak.
This commit is contained in:
parent
bcd2076274
commit
3e53de4bdd
|
@ -25,6 +25,7 @@ import org.eclipse.jetty.server.Server
|
|||
import org.eclipse.jetty.server.bio.SocketConnector
|
||||
import org.eclipse.jetty.server.ssl.SslSocketConnector
|
||||
import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder}
|
||||
import org.eclipse.jetty.util.component.LifeCycle
|
||||
import org.eclipse.jetty.util.security.{Constraint, Password}
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool
|
||||
|
||||
|
@ -155,6 +156,12 @@ private[spark] class HttpServer(
|
|||
throw new ServerStateException("Server is already stopped")
|
||||
} else {
|
||||
server.stop()
|
||||
// Stop the ThreadPool if it supports stop() method (through LifeCycle).
|
||||
// It is needed because stopping the Server won't stop the ThreadPool it uses.
|
||||
val threadPool = server.getThreadPool
|
||||
if (threadPool != null && threadPool.isInstanceOf[LifeCycle]) {
|
||||
threadPool.asInstanceOf[LifeCycle].stop
|
||||
}
|
||||
port = -1
|
||||
server = null
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.eclipse.jetty.server.handler._
|
|||
import org.eclipse.jetty.server.nio.SelectChannelConnector
|
||||
import org.eclipse.jetty.server.ssl.SslSelectChannelConnector
|
||||
import org.eclipse.jetty.servlet._
|
||||
import org.eclipse.jetty.util.component.LifeCycle
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool
|
||||
import org.json4s.JValue
|
||||
import org.json4s.jackson.JsonMethods.{pretty, render}
|
||||
|
@ -350,4 +351,15 @@ private[spark] object JettyUtils extends Logging {
|
|||
private[spark] case class ServerInfo(
|
||||
server: Server,
|
||||
boundPort: Int,
|
||||
rootHandler: ContextHandlerCollection)
|
||||
rootHandler: ContextHandlerCollection) {
|
||||
|
||||
def stop(): Unit = {
|
||||
server.stop()
|
||||
// Stop the ThreadPool if it supports stop() method (through LifeCycle).
|
||||
// It is needed because stopping the Server won't stop the ThreadPool it uses.
|
||||
val threadPool = server.getThreadPool
|
||||
if (threadPool != null && threadPool.isInstanceOf[LifeCycle]) {
|
||||
threadPool.asInstanceOf[LifeCycle].stop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -153,7 +153,7 @@ private[spark] abstract class WebUI(
|
|||
def stop() {
|
||||
assert(serverInfo.isDefined,
|
||||
"Attempted to stop %s before binding to a server!".format(className))
|
||||
serverInfo.get.server.stop()
|
||||
serverInfo.get.stop()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue