[SPARK-16182][CORE] Utils.scala -- terminateProcess() should call Process.destroyForcibly() if and only if Process.destroy() fails

## What changes were proposed in this pull request?

Utils.terminateProcess should `destroy()` first and only fall back to `destroyForcibly()` if it fails. It's kind of bad that we're force-killing executors -- and only in Java 8. See JIRA for an example of the impact: no shutdown

While here: `Utils.waitForProcess` should use the Java 8 method if available instead of a custom implementation.

## How was this patch tested?

Existing tests, which cover the force-kill case, and Amplab tests, which will cover both Java 7 and Java 8 eventually. However I tested locally on Java 8 and the PR builder will try Java 7 here.

Author: Sean Owen <sowen@cloudera.com>

Closes #13973 from srowen/SPARK-16182.
This commit is contained in:
Sean Owen 2016-07-01 09:22:27 +01:00
parent fbfd0ab9d7
commit 2075bf8ef6
2 changed files with 47 additions and 31 deletions

View file

@ -1772,50 +1772,66 @@ private[spark] object Utils extends Logging {
}
/**
* Terminates a process waiting for at most the specified duration. Returns whether
* the process terminated.
* Terminates a process waiting for at most the specified duration.
*
* @return the process exit value if it was successfully terminated, else None
*/
def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = {
try {
// Java8 added a new API which will more forcibly kill the process. Use that if available.
val destroyMethod = process.getClass().getMethod("destroyForcibly");
destroyMethod.setAccessible(true)
destroyMethod.invoke(process)
} catch {
case NonFatal(e) =>
if (!e.isInstanceOf[NoSuchMethodException]) {
logWarning("Exception when attempting to kill process", e)
}
process.destroy()
}
// Politely destroy first
process.destroy()
if (waitForProcess(process, timeoutMs)) {
// Successful exit
Option(process.exitValue())
} else {
None
// Java 8 added a new API which will more forcibly kill the process. Use that if available.
try {
classOf[Process].getMethod("destroyForcibly").invoke(process)
} catch {
case _: NoSuchMethodException => return None // Not available; give up
case NonFatal(e) => logWarning("Exception when attempting to kill process", e)
}
// Wait, again, although this really should return almost immediately
if (waitForProcess(process, timeoutMs)) {
Option(process.exitValue())
} else {
logWarning("Timed out waiting to forcibly kill process")
None
}
}
}
/**
* Wait for a process to terminate for at most the specified duration.
* Return whether the process actually terminated after the given timeout.
*
* @return whether the process actually terminated before the given timeout.
*/
def waitForProcess(process: Process, timeoutMs: Long): Boolean = {
var terminated = false
val startTime = System.currentTimeMillis
while (!terminated) {
try {
process.exitValue()
terminated = true
} catch {
case e: IllegalThreadStateException =>
// Process not terminated yet
if (System.currentTimeMillis - startTime > timeoutMs) {
return false
try {
// Use Java 8 method if available
classOf[Process].getMethod("waitFor", java.lang.Long.TYPE, classOf[TimeUnit])
.invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS)
.asInstanceOf[Boolean]
} catch {
case _: NoSuchMethodError =>
// Otherwise implement it manually
var terminated = false
val startTime = System.currentTimeMillis
while (!terminated) {
try {
process.exitValue()
terminated = true
} catch {
case e: IllegalThreadStateException =>
// Process not terminated yet
if (System.currentTimeMillis - startTime > timeoutMs) {
return false
}
Thread.sleep(100)
}
Thread.sleep(100)
}
}
true
}
true
}
/**

View file

@ -863,7 +863,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(terminated.isDefined)
Utils.waitForProcess(process, 5000)
val duration = System.currentTimeMillis() - start
assert(duration < 5000)
assert(duration < 6000) // add a little extra time to allow a force kill to finish
assert(!pidExists(pid))
} finally {
signal(pid, "SIGKILL")