[SPARK-8393][STREAMING] JavaStreamingContext#awaitTermination() throws non-declared InterruptedException
## What changes were proposed in this pull request? `JavaStreamingContext.awaitTermination` methods should be declared as `throws[InterruptedException]` so that this exception can be handled in Java code. Note this is not just a doc change, but an API change, since now (in Java) the method has a checked exception to handle. All await-like methods in Java APIs behave this way, so seems worthwhile for 2.0. ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #12418 from srowen/SPARK-8393.
This commit is contained in:
parent
cb51680d22
commit
8bd05c9db2
|
@ -58,7 +58,7 @@ public class JavaStreamingTestExample {
|
|||
|
||||
private static int timeoutCounter = 0;
|
||||
|
||||
public static void main(String[] args) {
|
||||
public static void main(String[] args) throws Exception {
|
||||
if (args.length != 3) {
|
||||
System.err.println("Usage: JavaStreamingTestExample " +
|
||||
"<dataDir> <batchDuration> <numBatchesTimeout>");
|
||||
|
|
|
@ -58,7 +58,7 @@ import java.util.regex.Pattern;
|
|||
public class JavaCustomReceiver extends Receiver<String> {
|
||||
private static final Pattern SPACE = Pattern.compile(" ");
|
||||
|
||||
public static void main(String[] args) {
|
||||
public static void main(String[] args) throws Exception {
|
||||
if (args.length < 2) {
|
||||
System.err.println("Usage: JavaCustomReceiver <hostname> <port>");
|
||||
System.exit(1);
|
||||
|
|
|
@ -21,6 +21,8 @@ import java.util.HashMap;
|
|||
import java.util.HashSet;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import scala.Tuple2;
|
||||
|
@ -47,7 +49,7 @@ import org.apache.spark.streaming.Durations;
|
|||
public final class JavaDirectKafkaWordCount {
|
||||
private static final Pattern SPACE = Pattern.compile(" ");
|
||||
|
||||
public static void main(String[] args) {
|
||||
public static void main(String[] args) throws Exception {
|
||||
if (args.length < 2) {
|
||||
System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n" +
|
||||
" <brokers> is a list of one or more Kafka brokers\n" +
|
||||
|
@ -64,8 +66,8 @@ public final class JavaDirectKafkaWordCount {
|
|||
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
|
||||
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
|
||||
|
||||
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
|
||||
HashMap<String, String> kafkaParams = new HashMap<>();
|
||||
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
|
||||
Map<String, String> kafkaParams = new HashMap<>();
|
||||
kafkaParams.put("metadata.broker.list", brokers);
|
||||
|
||||
// Create direct kafka stream with brokers and topics
|
||||
|
|
|
@ -43,7 +43,7 @@ public final class JavaFlumeEventCount {
|
|||
private JavaFlumeEventCount() {
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
public static void main(String[] args) throws Exception {
|
||||
if (args.length != 2) {
|
||||
System.err.println("Usage: JavaFlumeEventCount <host> <port>");
|
||||
System.exit(1);
|
||||
|
|
|
@ -57,7 +57,7 @@ public final class JavaKafkaWordCount {
|
|||
private JavaKafkaWordCount() {
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
public static void main(String[] args) throws Exception {
|
||||
if (args.length < 4) {
|
||||
System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
|
||||
System.exit(1);
|
||||
|
|
|
@ -48,7 +48,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
|
|||
public final class JavaNetworkWordCount {
|
||||
private static final Pattern SPACE = Pattern.compile(" ");
|
||||
|
||||
public static void main(String[] args) {
|
||||
public static void main(String[] args) throws Exception {
|
||||
if (args.length < 2) {
|
||||
System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
|
||||
System.exit(1);
|
||||
|
|
|
@ -183,7 +183,7 @@ public final class JavaRecoverableNetworkWordCount {
|
|||
return ssc;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
public static void main(String[] args) throws Exception {
|
||||
if (args.length != 4) {
|
||||
System.err.println("You arguments were " + Arrays.asList(args));
|
||||
System.err.println(
|
||||
|
|
|
@ -53,7 +53,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
|
|||
public final class JavaSqlNetworkWordCount {
|
||||
private static final Pattern SPACE = Pattern.compile(" ");
|
||||
|
||||
public static void main(String[] args) {
|
||||
public static void main(String[] args) throws Exception {
|
||||
if (args.length < 2) {
|
||||
System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
|
||||
System.exit(1);
|
||||
|
|
|
@ -50,7 +50,7 @@ import org.apache.spark.streaming.api.java.*;
|
|||
public class JavaStatefulNetworkWordCount {
|
||||
private static final Pattern SPACE = Pattern.compile(" ");
|
||||
|
||||
public static void main(String[] args) {
|
||||
public static void main(String[] args) throws Exception {
|
||||
if (args.length < 2) {
|
||||
System.err.println("Usage: JavaStatefulNetworkWordCount <hostname> <port>");
|
||||
System.exit(1);
|
||||
|
|
|
@ -24,7 +24,6 @@ import java.util.List;
|
|||
import java.util.regex.Pattern;
|
||||
|
||||
import com.amazonaws.regions.RegionUtils;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.api.java.function.Function2;
|
||||
|
@ -81,9 +80,8 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn
|
|||
*/
|
||||
public final class JavaKinesisWordCountASL { // needs to be public for access from run-example
|
||||
private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
|
||||
private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class);
|
||||
|
||||
public static void main(String[] args) {
|
||||
public static void main(String[] args) throws Exception {
|
||||
// Check that all required args were passed in.
|
||||
if (args.length != 3) {
|
||||
System.err.println(
|
||||
|
|
|
@ -558,6 +558,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
|
|||
* Wait for the execution to stop. Any exceptions that occurs during the execution
|
||||
* will be thrown in this thread.
|
||||
*/
|
||||
@throws[InterruptedException]
|
||||
def awaitTermination(): Unit = {
|
||||
ssc.awaitTermination()
|
||||
}
|
||||
|
@ -570,6 +571,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
|
|||
* @return `true` if it's stopped; or throw the reported error during the execution; or `false`
|
||||
* if the waiting time elapsed before returning from the method.
|
||||
*/
|
||||
@throws[InterruptedException]
|
||||
def awaitTerminationOrTimeout(timeout: Long): Boolean = {
|
||||
ssc.awaitTerminationOrTimeout(timeout)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue