Automatically unpersisting RDDs that have been cleaned up from DStreams
Earlier RDDs generated by DStreams were forgotten but not unpersisted. The system relied on the natural BlockManager LRU to drop the data. The cleaner.ttl was a hammer to clean up RDDs but it is something that needs to be set separately and need to be set very conservatively (at best, few minutes). This automatic unpersisting allows the system to handle this automatically, which reduces memory usage. As a side effect it will also improve GC performance as there are less number of objects stored in memory. In fact, for some workloads, it may allow RDDs to be cached as deserialized, which speeds up processing without too much GC overheads.
This is disabled by default. To enable it set configuration spark.streaming.unpersist to true. In future release, this will be set to true by default.
Also, reduced sleep time in TaskSchedulerImpl.stop() from 5 second to 1 second. From my conversation with Matei, there does not seem to be any good reason for the sleep for letting messages be sent out be so long.
Improved logic of finding new files in FileInputDStream
Earlier, if HDFS has a hiccup and reports a existence of a new file (mod time T sec) at time T + 1 sec, then fileStream could have missed that file. With this change, it should be able to find files that are delayed by up to <batch size> seconds. That is, even if file is reported at T + <batch time> sec, file stream should be able to catch it.
The new logic, at a high level, is as follows. It keeps track of the new files it found in the previous interval and mod time of the oldest of those files (lets call it X). Then in the current interval, it will ignore those files that were seen in the previous interval and those which have mod time older than X. So if a new file gets reported by HDFS that in the current interval, but has mod time in the previous interval, it will be considered. However, if the mod time earlier than the previous interval (that is, earlier than X), they will be ignored. This is the current limitation, and future version would improve this behavior further.
Also reduced line lengths in DStream to <=100 chars.
The bug was due to a misunderstanding of the activeSetOpt parameter to
Graph.mapReduceTriplets. Passing EdgeDirection.Both causes
mapReduceTriplets to run only on edges with *both* vertices in the
active set. This commit adds EdgeDirection.Either, which causes
mapReduceTriplets to run on edges with *either* vertex in the active
set. This is what connected components needed.