SPARK-3470 [CORE] [STREAMING] Add Closeable / close() to Java context objects
... that expose a stop() lifecycle method. This doesn't add `AutoCloseable`, which is Java 7+ only. But it should be possible to use try-with-resources on a `Closeable` in Java 7, as long as the `close()` does not throw a checked exception, and these don't. Q.E.D. Author: Sean Owen <sowen@cloudera.com> Closes #2346 from srowen/SPARK-3470 and squashes the following commits: 612c21d [Sean Owen] Add Closeable / close() to Java context objects that expose a stop() lifecycle method
This commit is contained in:
parent
e11eeb71fa
commit
feaa3706f1
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
package org.apache.spark.api.java
|
package org.apache.spark.api.java
|
||||||
|
|
||||||
|
import java.io.Closeable
|
||||||
import java.util
|
import java.util
|
||||||
import java.util.{Map => JMap}
|
import java.util.{Map => JMap}
|
||||||
|
|
||||||
|
@ -40,7 +41,9 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
|
||||||
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
|
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
|
||||||
* [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones.
|
* [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones.
|
||||||
*/
|
*/
|
||||||
class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround {
|
class JavaSparkContext(val sc: SparkContext)
|
||||||
|
extends JavaSparkContextVarargsWorkaround with Closeable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a JavaSparkContext that loads settings from system properties (for instance, when
|
* Create a JavaSparkContext that loads settings from system properties (for instance, when
|
||||||
* launching with ./bin/spark-submit).
|
* launching with ./bin/spark-submit).
|
||||||
|
@ -534,6 +537,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
|
||||||
sc.stop()
|
sc.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def close(): Unit = stop()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get Spark's home location from either a value set through the constructor,
|
* Get Spark's home location from either a value set through the constructor,
|
||||||
* or the spark.home Java property, or the SPARK_HOME environment variable
|
* or the spark.home Java property, or the SPARK_HOME environment variable
|
||||||
|
|
|
@ -21,7 +21,7 @@ package org.apache.spark.streaming.api.java
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
|
|
||||||
import java.io.InputStream
|
import java.io.{Closeable, InputStream}
|
||||||
import java.util.{List => JList, Map => JMap}
|
import java.util.{List => JList, Map => JMap}
|
||||||
|
|
||||||
import akka.actor.{Props, SupervisorStrategy}
|
import akka.actor.{Props, SupervisorStrategy}
|
||||||
|
@ -49,7 +49,7 @@ import org.apache.spark.streaming.receiver.Receiver
|
||||||
* respectively. `context.awaitTransformation()` allows the current thread to wait for the
|
* respectively. `context.awaitTransformation()` allows the current thread to wait for the
|
||||||
* termination of a context by `stop()` or by an exception.
|
* termination of a context by `stop()` or by an exception.
|
||||||
*/
|
*/
|
||||||
class JavaStreamingContext(val ssc: StreamingContext) {
|
class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a StreamingContext.
|
* Create a StreamingContext.
|
||||||
|
@ -540,6 +540,9 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
||||||
def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = {
|
def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = {
|
||||||
ssc.stop(stopSparkContext, stopGracefully)
|
ssc.stop(stopSparkContext, stopGracefully)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def close(): Unit = stop()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in a new issue