[SPARK-17638][STREAMING] Stop JVM StreamingContext when the Python process is dead
## What changes were proposed in this pull request? When the Python process is dead, the JVM StreamingContext is still running. Hence we will see a lot of Py4jException before the JVM process exits. It's better to stop the JVM StreamingContext to avoid those annoying logs. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #15201 from zsxwing/stop-jvm-ssc.
This commit is contained in:
parent
85d609cf25
commit
3cdae0ff2f
|
@ -24,11 +24,14 @@ import java.util.{ArrayList => JArrayList, List => JList}
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.language.existentials
|
import scala.language.existentials
|
||||||
|
|
||||||
|
import py4j.Py4JException
|
||||||
|
|
||||||
import org.apache.spark.SparkException
|
import org.apache.spark.SparkException
|
||||||
import org.apache.spark.api.java._
|
import org.apache.spark.api.java._
|
||||||
|
import org.apache.spark.internal.Logging
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.storage.StorageLevel
|
import org.apache.spark.storage.StorageLevel
|
||||||
import org.apache.spark.streaming.{Duration, Interval, Time}
|
import org.apache.spark.streaming.{Duration, Interval, StreamingContext, Time}
|
||||||
import org.apache.spark.streaming.api.java._
|
import org.apache.spark.streaming.api.java._
|
||||||
import org.apache.spark.streaming.dstream._
|
import org.apache.spark.streaming.dstream._
|
||||||
import org.apache.spark.util.Utils
|
import org.apache.spark.util.Utils
|
||||||
|
@ -157,7 +160,7 @@ private[python] object PythonTransformFunctionSerializer {
|
||||||
/**
|
/**
|
||||||
* Helper functions, which are called from Python via Py4J.
|
* Helper functions, which are called from Python via Py4J.
|
||||||
*/
|
*/
|
||||||
private[python] object PythonDStream {
|
private[streaming] object PythonDStream {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* can not access PythonTransformFunctionSerializer.register() via Py4j
|
* can not access PythonTransformFunctionSerializer.register() via Py4j
|
||||||
|
@ -184,6 +187,32 @@ private[python] object PythonDStream {
|
||||||
rdds.asScala.foreach(queue.add)
|
rdds.asScala.foreach(queue.add)
|
||||||
queue
|
queue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop [[StreamingContext]] if the Python process crashes (E.g., OOM) in case the user cannot
|
||||||
|
* stop it in the Python side.
|
||||||
|
*/
|
||||||
|
def stopStreamingContextIfPythonProcessIsDead(e: Throwable): Unit = {
|
||||||
|
// These two special messages are from:
|
||||||
|
// scalastyle:off
|
||||||
|
// https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L218
|
||||||
|
// https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L340
|
||||||
|
// scalastyle:on
|
||||||
|
if (e.isInstanceOf[Py4JException] &&
|
||||||
|
("Cannot obtain a new communication channel" == e.getMessage ||
|
||||||
|
"Error while obtaining a new communication channel" == e.getMessage)) {
|
||||||
|
// Start a new thread to stop StreamingContext to avoid deadlock.
|
||||||
|
new Thread("Stop-StreamingContext") with Logging {
|
||||||
|
setDaemon(true)
|
||||||
|
|
||||||
|
override def run(): Unit = {
|
||||||
|
logError(
|
||||||
|
"Cannot connect to Python process. It's probably dead. Stopping StreamingContext.", e)
|
||||||
|
StreamingContext.getActive().foreach(_.stop(stopSparkContext = false))
|
||||||
|
}
|
||||||
|
}.start()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -22,6 +22,7 @@ import scala.util.{Failure, Success, Try}
|
||||||
import org.apache.spark.internal.Logging
|
import org.apache.spark.internal.Logging
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
|
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
|
||||||
|
import org.apache.spark.streaming.api.python.PythonDStream
|
||||||
import org.apache.spark.streaming.util.RecurringTimer
|
import org.apache.spark.streaming.util.RecurringTimer
|
||||||
import org.apache.spark.util.{Clock, EventLoop, ManualClock, Utils}
|
import org.apache.spark.util.{Clock, EventLoop, ManualClock, Utils}
|
||||||
|
|
||||||
|
@ -252,6 +253,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
|
||||||
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
|
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
|
||||||
case Failure(e) =>
|
case Failure(e) =>
|
||||||
jobScheduler.reportError("Error generating jobs for time " + time, e)
|
jobScheduler.reportError("Error generating jobs for time " + time, e)
|
||||||
|
PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
|
||||||
}
|
}
|
||||||
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
|
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.spark.ExecutorAllocationClient
|
||||||
import org.apache.spark.internal.Logging
|
import org.apache.spark.internal.Logging
|
||||||
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
|
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
|
||||||
import org.apache.spark.streaming._
|
import org.apache.spark.streaming._
|
||||||
|
import org.apache.spark.streaming.api.python.PythonDStream
|
||||||
import org.apache.spark.streaming.ui.UIUtils
|
import org.apache.spark.streaming.ui.UIUtils
|
||||||
import org.apache.spark.util.{EventLoop, ThreadUtils}
|
import org.apache.spark.util.{EventLoop, ThreadUtils}
|
||||||
|
|
||||||
|
@ -217,6 +218,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
|
||||||
private def handleError(msg: String, e: Throwable) {
|
private def handleError(msg: String, e: Throwable) {
|
||||||
logError(msg, e)
|
logError(msg, e)
|
||||||
ssc.waiter.notifyError(e)
|
ssc.waiter.notifyError(e)
|
||||||
|
PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
private class JobHandler(job: Job) extends Runnable with Logging {
|
private class JobHandler(job: Job) extends Runnable with Logging {
|
||||||
|
|
Loading…
Reference in a new issue