Remove simple redundant return statements for Scala methods/functions
Remove simple redundant return statements for Scala methods/functions:
-) Only change simple return statements at the end of method
-) Ignore the complex if-else check
-) Ignore the ones inside synchronized
-) Add small changes to making var to val if possible and remove () for simple get
This hopefully makes the review simpler =)
Pass compile and tests.
Better error handling in Spark Streaming and more API cleanup
Earlier errors in jobs generated by Spark Streaming (or in the generation of jobs) could not be caught from the main driver thread (i.e. the thread that called StreamingContext.start()) as it would be thrown in different threads. With this change, after `ssc.start`, one can call `ssc.awaitTermination()` which will be block until the ssc is closed, or there is an exception. This makes it easier to debug.
This change also adds ssc.stop(<stop-spark-context>) where you can stop StreamingContext without stopping the SparkContext.
Also fixes the bug that came up with PRs #393 and #381. MetadataCleaner default value has been changed from 3500 to -1 for normal SparkContext and 3600 when creating a StreamingContext. Also, updated StreamingListenerBus with changes similar to SparkListenerBus in #392.
And changed a lot of protected[streaming] to private[streaming].
Rename DStream.foreach to DStream.foreachRDD
`foreachRDD` makes it clear that the granularity of this operator is per-RDD.
As it stands, `foreach` is inconsistent with with `map`, `filter`, and the other
DStream operators which get pushed down to individual records within each RDD.
`foreachRDD` makes it clear that the granularity of this operator is per-RDD.
As it stands, `foreach` is inconsistent with with `map`, `filter`, and the other
DStream operators which get pushed down to individual records within each RDD.
Setting load defaults to true in executor
This preserves the behavior in earlier releases. If properties are set for the executors via `spark-env.sh` on the slaves, then they should take precedence over spark defaults. This is useful for if system administrators are setting properties for a standalone cluster, such as shuffle locations.
/cc @andrewor14 who initially reported this issue.
Stop SparkListenerBus daemon thread when DAGScheduler is stopped.
Otherwise this leads to hundreds of SparkListenerBus daemon threads in our unit tests (and also problematic if user applications launches multiple SparkContext).
Revert PR 381
This PR missed a bunch of test cases that require "spark.cleaner.ttl". I think it is what is causing test failures on Jenkins right now (though it's a bit hard to tell because the DNS for cs.berkeley.edu is down).
I'm submitting this to see if it fixes jeknins. I did try just patching various tests but it was taking a really long time because there are a bunch of them, so for now I'm just seeing if a revert works.
We clone hadoop key and values by default and reuse objects if asked to.
We try to clone for most common types of writables and we call WritableUtils.clone otherwise intention is to optimize, for example for NullWritable there is no need and for Long, int and String creating a new object with value set would be faster than doing copy on object hopefully.
There is another way to do this PR where we ask for both key and values whether to clone them or not, but could not think of a use case for it except either of them is actually a NullWritable for which I have already worked around. So thought that would be unnecessary.
Change clientId to random clientId
The client identifier should be unique across all clients connecting to the same server. A convenience method is provided to generate a random client id that should satisfy this criteria - generateClientId(). Returns a randomly generated client identifier based on the current user's login name and the system time. As the client identifier is used by the server to identify a client when it reconnects, the client must use the same identifier between connections if durable subscriptions are to be used.
API for automatic driver recovery for streaming programs and other bug fixes
1. Added Scala and Java API for automatically loading checkpoint if it exists in the provided checkpoint directory.
Scala API: `StreamingContext.getOrCreate(<checkpoint dir>, <function to create new StreamingContext>)` returns a StreamingContext
Java API: `JavaStreamingContext.getOrCreate(<checkpoint dir>, <factory obj of type JavaStreamingContextFactory>)`, return a JavaStreamingContext
See the RecoverableNetworkWordCount below as an example of how to use it.
2. Refactored streaming.Checkpoint*** code to fix bugs and make the DStream metadata checkpoint writing and reading more robust. Specifically, it fixes and improves the logic behind backing up and writing metadata checkpoint files. Also, it ensure that spark.driver.* and spark.hostPort is cleared from SparkConf before being written to checkpoint.
3. Fixed bug in cleaning up of checkpointed RDDs created by DStream. Specifically, this fix ensures that checkpointed RDD's files are not prematurely cleaned up, thus ensuring reliable recovery.
4. TimeStampedHashMap is upgraded to optionally update the timestamp on map.get(key). This allows clearing of data based on access time (i.e., clear records were last accessed before a threshold timestamp).
5. Added caching for file modification time in FileInputDStream using the updated TimeStampedHashMap. Without the caching, enumerating the mod times to find new files can take seconds if there are 1000s of files. This cache is automatically cleared.
This PR is not entirely final as I may make some minor additions - a Java examples, and adding StreamingContext.getOrCreate to unit test.
Edit: Java example to be added later, unit test added.
External Sorting for Aggregator and CoGroupedRDDs (Revisited)
(This pull request is re-opened from https://github.com/apache/incubator-spark/pull/303, which was closed because Jenkins / github was misbehaving)
The target issue for this patch is the out-of-memory exceptions triggered by aggregate operations such as reduce, groupBy, join, and cogroup. The existing AppendOnlyMap used by these operations resides purely in memory, and grows with the size of the input data until the amount of allocated memory is exceeded. Under large workloads, this problem is aggravated by the fact that OOM frequently occurs only after a very long (> 1 hour) map phase, in which case the entire job must be restarted.
The solution is to spill the contents of this map to disk once a certain memory threshold is exceeded. This functionality is provided by ExternalAppendOnlyMap, which additionally sorts this buffer before writing it out to disk, and later merges these buffers back in sorted order.
Under normal circumstances in which OOM is not triggered, ExternalAppendOnlyMap is simply a wrapper around AppendOnlyMap and incurs little overhead. Only when the memory usage is expected to exceed the given threshold does ExternalAppendOnlyMap spill to disk.
Aside from trivial formatting changes, use nulls instead of Options for
DiskMapIterator, and add documentation for spark.shuffle.externalSorting
and spark.shuffle.memoryFraction.
Also, set spark.shuffle.memoryFraction to 0.3, and spark.storage.memoryFraction = 0.6.
Yarn client addjar and misc fixes
Fix the addJar functionality in yarn-client mode, add support for the other options supported in yarn-standalone mode, set the application type on yarn in hadoop 2.X, add documentation, change heartbeat interval to be same code as the yarn-standalone so it doesn't take so long to get containers and exit.
Make DEBUG-level logs consummable.
Removes two things that caused issues with the debug logs:
(a) Internal polling in the DAGScheduler was polluting the logs.
(b) The Scala REPL logs were really noisy.