StreamingContext.start() can throw exception because DStream.validateAtStart() fails (say, checkpoint directory not set for StateDStream). But by then JobScheduler, JobGenerator, and ReceiverTracker has already started, along with their actors. But those cannot be shutdown because the only way to do that is call StreamingContext.stop() which cannot be called as the context has not been marked as ACTIVE.
The solution in this PR is to stop the internal scheduler if start throw exception, and mark the context as STOPPED.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#6559 from tdas/SPARK-7958 and squashes the following commits:
20b2ec1 [Tathagata Das] Added synchronized
790b617 [Tathagata Das] Handled exception in StreamingContext.start()
(cherry picked from commit 2f9c7519d6)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Attempts to restart the socket receiver when it is supposed to be stopped causes undesirable error messages.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#6483 from tdas/SPARK-7931 and squashes the following commits:
09aeee1 [Tathagata Das] Do not restart receiver when stopped
So we can enable a whitespace enforcement rule in the style checker to save code review time.
Author: Reynold Xin <rxin@databricks.com>
Closes#6475 from rxin/whitespace-streaming and squashes the following commits:
810dae4 [Reynold Xin] Fixed tests.
89068ad [Reynold Xin] [SPARK-7927] whitespace fixes for streaming.
(cherry picked from commit 3af0b3136e)
Signed-off-by: Reynold Xin <rxin@databricks.com>
In the old implementation, if a batch has no block, `areWALRecordHandlesPresent` will be `true` and it will return `WriteAheadLogBackedBlockRDD`.
This PR handles this case by returning `WriteAheadLogBackedBlockRDD` or `BlockRDD` according to the configuration.
Author: zsxwing <zsxwing@gmail.com>
Closes#6372 from zsxwing/SPARK-7777 and squashes the following commits:
788f895 [zsxwing] Handle the case when there is no block in a batch
(cherry picked from commit ad0badba14)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#6369 from tdas/SPARK-7838 and squashes the following commits:
87d1c7f [Tathagata Das] Addressed comment
37775d8 [Tathagata Das] set scope for kinesis stream
(cherry picked from commit baa89838cc)
Signed-off-by: Andrew Or <andrew@databricks.com>
Shutdown hook to stop SparkContext was added recently. This results in ugly errors when a streaming application is terminated by ctrl-C.
```
Exception in thread "Thread-27" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:736)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:735)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:735)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1468)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1403)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1642)
at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:559)
at org.apache.spark.util.SparkShutdownHook.run(Utils.scala:2266)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Utils.scala:2236)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2236)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2236)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1764)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(Utils.scala:2236)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2236)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2236)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.util.SparkShutdownHookManager.runAll(Utils.scala:2236)
at org.apache.spark.util.SparkShutdownHookManager$$anon$6.run(Utils.scala:2218)
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
```
This is because the Spark's shutdown hook stops the context, and the streaming jobs fail in the middle. The correct solution is to stop the streaming context before the spark context. This PR adds the shutdown hook to do so with a priority higher than the SparkContext's shutdown hooks priority.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#6307 from tdas/SPARK-7776 and squashes the following commits:
e3d5475 [Tathagata Das] Added conf to specify graceful shutdown
4c18652 [Tathagata Das] Added shutdown hook to StreamingContxt.
(cherry picked from commit d68ea24d60)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Assertions can be turned off. `require` throws an `IllegalArgumentException` which makes more sense when it's a user set variable.
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#6271 from brkyvz/streaming-require and squashes the following commits:
d249484 [Burak Yavuz] fix merge conflict
264adb8 [Burak Yavuz] addressed comments v1.0
6161350 [Burak Yavuz] fix tests
16aa766 [Burak Yavuz] changed more assertions to more meaningful errors
afd923d [Burak Yavuz] changed some assertions to require
(cherry picked from commit 1ee8eb431e)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Just added a guard to make sure a batch has completed before moving to the next batch.
Author: zsxwing <zsxwing@gmail.com>
Closes#6306 from zsxwing/SPARK-7777 and squashes the following commits:
ecee529 [zsxwing] Fix the failure message
58634fe [zsxwing] Fix the flaky test in org.apache.spark.streaming.BasicOperationsSuite
(cherry picked from commit 895baf8f77)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Currently, the background checkpointing thread fails silently if the checkpoint is not serializable. It is hard to debug and therefore its best to fail fast at `start()` when checkpointing is enabled and the checkpoint is not serializable.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#6292 from tdas/SPARK-7767 and squashes the following commits:
51304e6 [Tathagata Das] Addressed comments.
c35237b [Tathagata Das] Added test for checkpoint serialization in StreamingContext.start()
(cherry picked from commit 3c434cbfd0)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
SPARK-7741 is the equivalent of SPARK-7237 in streaming. This is an alternative to #6268.
Author: Andrew Or <andrew@databricks.com>
Closes#6269 from andrewor14/clean-moar and squashes the following commits:
c51c9ab [Andrew Or] Add periods (trivial)
6c686ac [Andrew Or] Merge branch 'master' of github.com:apache/spark into clean-moar
79a435b [Andrew Or] Fix tests
d18c9f9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into clean-moar
65ef07b [Andrew Or] Fix tests?
4b487a3 [Andrew Or] Add tests for closures passed to DStream operations
328139b [Andrew Or] Do not forget foreachRDD
5431f61 [Andrew Or] Clean streaming closures
72b7b73 [Andrew Or] Clean core closures
(cherry picked from commit 9b84443dd4)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>