This PR is a fixed version of the original PR #3237 by watermen and scwf.
This adds the ability to specify how many elements to print in `DStream.print`.
Author: Yadong Qi <qiyadong2010@gmail.com>
Author: q00251598 <qiyadong@huawei.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: wangfei <wangfei1@huawei.com>
Closes#3865 from tdas/print-num and squashes the following commits:
cd34e9e [Tathagata Das] Fix bug
7c09f16 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into HEAD
bb35d1a [Yadong Qi] Update MimaExcludes.scala
f8098ca [Yadong Qi] Update MimaExcludes.scala
f6ac3cb [Yadong Qi] Update MimaExcludes.scala
e4ed897 [Yadong Qi] Update MimaExcludes.scala
3b9d5cf [wangfei] fix conflicts
ec8a3af [q00251598] move to Spark 1.3
26a70c0 [q00251598] extend the Python DStream's print
b589a4b [q00251598] add another print function
Spark Streaming's ReceiverMessage trait should extend Serializable in order to fix a subtle bug that only occurs when running on a real cluster:
If you attempt to send a fire-and-forget message to a remote Akka actor and that message cannot be serialized, then this seems to lead to more-or-less silent failures. As an optimization, Akka skips message serialization for messages sent within the same JVM. As a result, Spark's unit tests will never fail due to non-serializable Akka messages, but these will cause mostly-silent failures when running on a real cluster.
Before this patch, here was the code for ReceiverMessage:
```
/** Messages sent to the NetworkReceiver. */
private[streaming] sealed trait ReceiverMessage
private[streaming] object StopReceiver extends ReceiverMessage
```
Since ReceiverMessage does not extend Serializable and StopReceiver is a regular `object`, not a `case object`, StopReceiver will throw serialization errors. As a result, graceful receiver shutdown is broken on real clusters (and local-cluster mode) but works in local modes. If you want to reproduce this, try running the word count example from the Streaming Programming Guide in the Spark shell:
```
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(10))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start()
Thread.sleep(10000)
ssc.stop(true, true)
```
Prior to this patch, this would work correctly in local mode but fail when running against a real cluster (it would report that some receivers were not shut down).
Author: Josh Rosen <joshrosen@databricks.com>
Closes#3857 from JoshRosen/SPARK-5035 and squashes the following commits:
71d0eae [Josh Rosen] [SPARK-5035] ReceiverMessage trait should extend Serializable.
...s to get deleted before continuing.
Since the deletes are happening asynchronously, the getFileStatus call might throw an exception in older HDFS
versions, if the delete happens between the time listFiles is called on the directory and getFileStatus is called
on the file in the getFileStatus method.
This PR addresses this by adding an option to delete the files synchronously and then waiting for the deletion to
complete before proceeding.
Author: Hari Shreedharan <hshreedharan@apache.org>
Closes#3726 from harishreedharan/spark-4790 and squashes the following commits:
bbbacd1 [Hari Shreedharan] Call cleanUpOldLogs only once in the tests.
3255f17 [Hari Shreedharan] Add test for async deletion. Remove method from ReceiverTracker that does not take waitForCompletion.
e4c83ec [Hari Shreedharan] Making waitForCompletion a mandatory param. Remove eventually from WALSuite since the cleanup method returns only after all files are deleted.
af00fd1 [Hari Shreedharan] [SPARK-4790][STREAMING] Fix ReceivedBlockTrackerSuite waits for old files to get deleted before continuing.
Used `Condition` to rewrite `ContextWaiter` because it provides a convenient API `awaitNanos` for timeout.
Author: zsxwing <zsxwing@gmail.com>
Closes#3661 from zsxwing/SPARK-4813 and squashes the following commits:
52247f5 [zsxwing] Add explicit unit type
be42bcf [zsxwing] Update as per review suggestion
e06bd4f [zsxwing] Fix the issue that ContextWaiter didn't handle 'spurious wakeup'
Author: CodingCat <zhunansjtu@gmail.com>
Closes#3807 from CodingCat/new_branch and squashes the following commits:
5167f01 [CodingCat] fix typo in the comment
There is only one implicit function `toPairDStreamFunctions` in `StreamingContext`. This PR did similar reorganization like [SPARK-4397](https://issues.apache.org/jira/browse/SPARK-4397).
Compiled the following codes with Spark Streaming 1.1.0 and ran it with this PR. Everything is fine.
```Scala
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
object StreamingApp {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("FileWordCount")
val ssc = new StreamingContext(conf, Seconds(10))
val lines = ssc.textFileStream("/some/path")
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
```
Author: zsxwing <zsxwing@gmail.com>
Closes#3464 from zsxwing/SPARK-4608 and squashes the following commits:
aa6d44a [zsxwing] Fix a copy-paste error
f74c190 [zsxwing] Merge branch 'master' into SPARK-4608
e6f9cc9 [zsxwing] Update the docs
27833bb [zsxwing] Remove `import StreamingContext._`
c15162c [zsxwing] Reorganize StreamingContext implicit to improve API convenience
Add `processingDelay`, `schedulingDelay` and `totalDelay` for the last completed batch. Add `lastReceivedBatchRecords` and `totalReceivedBatchRecords` to the received records counting.
Author: jerryshao <saisai.shao@intel.com>
Closes#3466 from jerryshao/SPARK-4537 and squashes the following commits:
00f5f7f [jerryshao] Change the code style and add totalProcessedRecords
44721a6 [jerryshao] Further address the comments
c097ddc [jerryshao] Address the comments
02dd44f [jerryshao] Fix the addressed comments
c7a9376 [jerryshao] Expand StreamingSource to add more metrics
Use `Future.zip` instead of `Future.flatMap`(for-loop). `zip` implies these two Futures will run concurrently, while `flatMap` usually means one Future depends on the other one.
Author: zsxwing <zsxwing@gmail.com>
Closes#3721 from zsxwing/SPARK-4873 and squashes the following commits:
46a2cd9 [zsxwing] Use Future.zip instead of Future.flatMap(for-loop)
Currently streaming block will be replicated when specific storage level is set, since WAL is already fault tolerant, so replication is needless and will hurt the throughput of streaming application.
Hi tdas , as per discussed about this issue, I fixed with this implementation, I'm not is this the way you want, would you mind taking a look at it? Thanks a lot.
Author: jerryshao <saisai.shao@intel.com>
Closes#3534 from jerryshao/SPARK-4671 and squashes the following commits:
500b456 [jerryshao] Do not replicate streaming block when WAL is enabled
Once the streaming receiver is de-registered at executor, the `ReceiverTrackerActor` needs to
remove the corresponding reveiverInfo from the `receiverInfo` map at `ReceiverTracker`.
Author: Ilayaperumal Gopinathan <igopinathan@pivotal.io>
Closes#3647 from ilayaperumalg/receiverInfo-RTracker and squashes the following commits:
6eb97d5 [Ilayaperumal Gopinathan] Polishing based on the review
3640c86 [Ilayaperumal Gopinathan] Remove receiverInfo once receiver is de-registered
Solves two JIRAs in one shot
- Makes the ForechDStream created by saveAsNewAPIHadoopFiles serializable for checkpoints
- Makes the default configuration object used saveAsNewAPIHadoopFiles be the Spark's hadoop configuration
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#3457 from tdas/savefiles-fix and squashes the following commits:
bb4729a [Tathagata Das] Same treatment for saveAsHadoopFiles
b382ea9 [Tathagata Das] Fix serialization issue in PairDStreamFunctions.saveAsNewAPIHadoopFiles.
When running the NetworkWordCount, the description of the word count jobs are set as "getCallsite at DStream:xxx" . This should be set to the line number of the streaming application that has the output operation that led to the job being created. This is because the callsite is incorrectly set in the thread launching the jobs. This PR fixes that.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#3455 from tdas/streaming-callsite-fix and squashes the following commits:
69fc26f [Tathagata Das] Set correct call site for streaming jobs so that it is displayed correctly on the Spark UI
Author: jerryshao <saisai.shao@intel.com>
Closes#3244 from jerryshao/SPARK-4381 and squashes the following commits:
d2486c7 [jerryshao] Improve the warning log
d726e85 [jerryshao] Add local[1] to the filter condition
eca428b [jerryshao] Add warning log
change `NetworkInputDStream` to `ReceiverInputDStream`
change `ReceiverInputTracker` to `ReceiverTracker`
Author: q00251598 <qiyadong@huawei.com>
Closes#3400 from watermen/fix-comments and squashes the following commits:
75d795c [q00251598] change 'NetworkInputDStream' to 'ReceiverInputDStream' && change 'ReceiverInputTracker' to 'ReceiverTracker'
Because of a corner case, a file already selected for batch t can get considered again for batch t+2. This refactoring fixes it by remembering all the files selected in the last 1 minute, so that this corner case does not arise. Also uses spark context's hadoop configuration to access the file system API for listing directories.
pwendell Please take look. I still have not run long-running integration tests, so I cannot say for sure whether this has indeed solved the issue. You could do a first pass on this in the meantime.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#3419 from tdas/filestream-fix2 and squashes the following commits:
c19dd8a [Tathagata Das] Addressed PR comments.
513b608 [Tathagata Das] Updated docs.
d364faf [Tathagata Das] Added the current time condition back
5526222 [Tathagata Das] Removed unnecessary imports.
38bb736 [Tathagata Das] Fix long line.
203bbc7 [Tathagata Das] Un-ignore tests.
eaef4e1 [Tathagata Das] Fixed SPARK-4519
9dbd40a [Tathagata Das] Refactored FileInputDStream to remember last few batches.
In class TransformedDStream:
```scala
require(parents.length > 0, "List of DStreams to transform is empty")
require(parents.map(.ssc).distinct.size == 1, "Some of the DStreams have different contexts")
require(parents.map(.slideDuration).distinct.size == 1,
"Some of the DStreams have different slide durations")
```
In class UnionDStream:
```scala
if (parents.length == 0)
{ throw new IllegalArgumentException("Empty array of parents") }
if (parents.map(.ssc).distinct.size > 1)
{ throw new IllegalArgumentException("Array of parents have different StreamingContexts") }
if (parents.map(.slideDuration).distinct.size > 1)
{ throw new IllegalArgumentException("Array of parents have different slide times") }
```
The function is the same, but the realization is not. I think they shoule be the same.
Author: Yadong Qi <qiyadong2010@gmail.com>
Closes#3152 from watermen/bug-fix1 and squashes the following commits:
ed66db6 [Yadong Qi] Change transform to union
b6b3b8b [Yadong Qi] The same function should have the same realization.
Removed `If `this` function returns None, then corresponding state key-value pair will be eliminated.` for the description of `updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)]`
Author: zsxwing <zsxwing@gmail.com>
Closes#3356 from zsxwing/SPARK-4481 and squashes the following commits:
76a9891 [zsxwing] Add a note that keys may be added or removed
0ebc42a [zsxwing] Fix the wrong description of updateFunc
The write ahead log of ReceivedBlockTracker gets enabled as soon as checkpoint directory is set. This should not happen, as the WAL should be enabled only if the WAL is enabled in the Spark configuration.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#3358 from tdas/SPARK-4482 and squashes the following commits:
b740136 [Tathagata Das] Fixed bug in ReceivedBlockTracker
Add ReliableKafkaReceiver in Kafka connector to prevent data loss if WAL in Spark Streaming is enabled. Details and design doc can be seen in [SPARK-4062](https://issues.apache.org/jira/browse/SPARK-4062).
Author: jerryshao <saisai.shao@intel.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Saisai Shao <saisai.shao@intel.com>
Closes#2991 from jerryshao/kafka-refactor and squashes the following commits:
5461f1c [Saisai Shao] Merge pull request #8 from tdas/kafka-refactor3
eae4ad6 [Tathagata Das] Refectored KafkaStreamSuiteBased to eliminate KafkaTestUtils and made Java more robust.
fab14c7 [Tathagata Das] minor update.
149948b [Tathagata Das] Fixed mistake
14630aa [Tathagata Das] Minor updates.
d9a452c [Tathagata Das] Minor updates.
ec2e95e [Tathagata Das] Removed the receiver's locks and essentially reverted to Saisai's original design.
2a20a01 [jerryshao] Address some comments
9f636b3 [Saisai Shao] Merge pull request #5 from tdas/kafka-refactor
b2b2f84 [Tathagata Das] Refactored Kafka receiver logic and Kafka testsuites
e501b3c [jerryshao] Add Mima excludes
b798535 [jerryshao] Fix the missed issue
e5e21c1 [jerryshao] Change to while loop
ea873e4 [jerryshao] Further address the comments
98f3d07 [jerryshao] Fix comment style
4854ee9 [jerryshao] Address all the comments
96c7a1d [jerryshao] Update the ReliableKafkaReceiver unit test
8135d31 [jerryshao] Fix flaky test
a949741 [jerryshao] Address the comments
16bfe78 [jerryshao] Change the ordering of imports
0894aef [jerryshao] Add some comments
77c3e50 [jerryshao] Code refactor and add some unit tests
dd9aeeb [jerryshao] Initial commit for reliable Kafka receiver
SPARK-3660 : Initial RDD for updateStateByKey transformation
I have added a sample StatefulNetworkWordCountWithInitial inspired by StatefulNetworkWordCount.
Please let me know if any changes are required.
Author: Soumitra Kumar <kumar.soumitra@gmail.com>
Closes#2665 from soumitrak/master and squashes the following commits:
ee8980b [Soumitra Kumar] Fixed copy/paste issue.
304f636 [Soumitra Kumar] Added simpler version of updateStateByKey API with initialRDD and test.
9781135 [Soumitra Kumar] Fixed test, and renamed variable.
3da51a2 [Soumitra Kumar] Adding updateStateByKey with initialRDD API to JavaPairDStream.
2f78f7e [Soumitra Kumar] Merge remote-tracking branch 'upstream/master'
d4fdd18 [Soumitra Kumar] Renamed variable and moved method.
d0ce2cd [Soumitra Kumar] Merge remote-tracking branch 'upstream/master'
31399a4 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master'
4efa58b [Soumitra Kumar] [SPARK-3660][STREAMING] Initial RDD for updateStateByKey transformation
8f40ca0 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master'
dde4271 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master'
fdd7db3 [Soumitra Kumar] Adding support of initial value for state update. SPARK-3660 : Initial RDD for updateStateByKey transformation
Replace some 'if-else' statement by math.min and math.max in Clock.scala
Author: huangzhaowei <carlmartinmax@gmail.com>
Closes#3088 from SaintBacchus/StreamingClock and squashes the following commits:
7b7f8e7 [huangzhaowei] [Streaming][Minor]Replace some 'if-else' in Clock
about convert files to RDDS there are 3 loops with files sequence in spark source.
loops files sequence:
1.files.map(...)
2.files.zip(fileRDDs)
3.files-size.foreach
It's will very time consuming when lots of files.So I do the following correction:
3 loops with files sequence => only one loop
Author: surq <surq@asiainfo.com>
Closes#2811 from surq/SPARK-3954 and squashes the following commits:
321bbe8 [surq] updated the code style.The style from [for...yield]to [files.map(file=>{})]
88a2c20 [surq] Merge branch 'master' of https://github.com/apache/spark into SPARK-3954
178066f [surq] modify code's style. [Exceeds 100 columns]
626ef97 [surq] remove redundant import(ArrayBuffer)
739341f [surq] promote the speed of convert files to RDDS
In Spark 1.0.0+, calling `stop()` on a StreamingContext that has not been started is a no-op which has no side-effects. This allows users to call `stop()` on a fresh StreamingContext followed by `start()`. I believe that this almost always indicates an error and is not behavior that we should support. Since we don't allow `start() stop() start()` then I don't think it makes sense to allow `stop() start()`.
The current behavior can lead to resource leaks when StreamingContext constructs its own SparkContext: if I call `stop(stopSparkContext=True)`, then I expect StreamingContext's underlying SparkContext to be stopped irrespective of whether the StreamingContext has been started. This is useful when writing unit test fixtures.
Prior discussions:
- https://github.com/apache/spark/pull/3053#discussion-diff-19710333R490
- https://github.com/apache/spark/pull/3121#issuecomment-61927353
Author: Josh Rosen <joshrosen@databricks.com>
Closes#3160 from JoshRosen/SPARK-4301 and squashes the following commits:
dbcc929 [Josh Rosen] Address more review comments
bdbe5da [Josh Rosen] Stop SparkContext after stopping scheduler, not before.
03e9c40 [Josh Rosen] Always stop SparkContext, even if stop(false) has already been called.
832a7f4 [Josh Rosen] Address review comment
5142517 [Josh Rosen] Add tests; improve Scaladoc.
813e471 [Josh Rosen] Revert workaround added in https://github.com/apache/spark/pull/3053/files#diff-e144dbee130ed84f9465853ddce65f8eR49
5558e70 [Josh Rosen] StreamingContext.stop() should stop SparkContext even if StreamingContext has not been started yet.
As part of the initiative of preventing data loss on driver failure, this JIRA tracks the sub task of modifying the streaming driver to reliably save received block metadata, and recover them on driver restart.
This was solved by introducing a `ReceivedBlockTracker` that takes all the responsibility of managing the metadata of received blocks (i.e. `ReceivedBlockInfo`, and any actions on them (e.g, allocating blocks to batches, etc.). All actions to block info get written out to a write ahead log (using `WriteAheadLogManager`). On recovery, all the actions are replaying to recreate the pre-failure state of the `ReceivedBlockTracker`, which include the batch-to-block allocations and the unallocated blocks.
Furthermore, the `ReceiverInputDStream` was modified to create `WriteAheadLogBackedBlockRDD`s when file segment info is present in the `ReceivedBlockInfo`. After recovery of all the block info (through recovery `ReceivedBlockTracker`), the `WriteAheadLogBackedBlockRDD`s gets recreated with the recovered info, and jobs submitted. The data of the blocks gets pulled from the write ahead logs, thanks to the segment info present in the `ReceivedBlockInfo`.
This is still a WIP. Things that are missing here are.
- *End-to-end integration tests:* Unit tests that tests the driver recovery, by killing and restarting the streaming context, and verifying all the input data gets processed. This has been implemented but not included in this PR yet. A sneak peek of that DriverFailureSuite can be found in this PR (on my personal repo): https://github.com/tdas/spark/pull/25 I can either include it in this PR, or submit that as a separate PR after this gets in.
- *WAL cleanup:* Cleaning up the received data write ahead log, by calling `ReceivedBlockHandler.cleanupOldBlocks`. This is being worked on.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#3026 from tdas/driver-ha-rbt and squashes the following commits:
a8009ed [Tathagata Das] Added comment
1d704bb [Tathagata Das] Enabled storing recovered WAL-backed blocks to BM
2ee2484 [Tathagata Das] More minor changes based on PR
47fc1e3 [Tathagata Das] Addressed PR comments.
9a7e3e4 [Tathagata Das] Refactored ReceivedBlockTracker API a bit to make things a little cleaner for users of the tracker.
af63655 [Tathagata Das] Minor changes.
fce2b21 [Tathagata Das] Removed commented lines
59496d3 [Tathagata Das] Changed class names, made allocation more explicit and added cleanup
19aec7d [Tathagata Das] Fixed casting bug.
f66d277 [Tathagata Das] Fix line lengths.
cda62ee [Tathagata Das] Added license
25611d6 [Tathagata Das] Minor changes before submitting PR
7ae0a7fb [Tathagata Das] Transferred changes from driver-ha-working branch
As part of the initiative of preventing data loss on streaming driver failure, this sub-task implements a BlockRDD that is backed by HDFS. This BlockRDD can either read data from the Spark's BlockManager, or read the data from file-segments in write ahead log in HDFS.
Most of this code has been written by @harishreedharan
Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Hari Shreedharan <hshreedharan@apache.org>
Closes#2931 from tdas/driver-ha-rdd and squashes the following commits:
209e49c [Tathagata Das] Better fix to style issue.
4a5866f [Tathagata Das] Addressed one more comment.
ed5fbf0 [Tathagata Das] Minor updates.
b0a18b1 [Tathagata Das] Fixed import order.
20aa7c6 [Tathagata Das] Fixed more line length issues.
29aa099 [Tathagata Das] Fixed line length issues.
9e47b5b [Tathagata Das] Renamed class, simplified+added unit tests.
6e1bfb8 [Tathagata Das] Tweaks testuite to create spark contxt lazily to prevent contxt leaks.
9c86a61 [Tathagata Das] Merge pull request #22 from harishreedharan/driver-ha-rdd
2878c38 [Hari Shreedharan] Shutdown spark context after tests. Formatting/minor fixes
c709f2f [Tathagata Das] Merge pull request #21 from harishreedharan/driver-ha-rdd
5cce16f [Hari Shreedharan] Make sure getBlockLocations uses offset and length to find the blocks on HDFS
eadde56 [Tathagata Das] Transferred HDFSBackedBlockRDD for the driver-ha-working branch
As part of the initiative to prevent data loss on streaming driver failure, this JIRA tracks the subtask of implementing a ReceivedBlockHandler, that abstracts the functionality of storage of received data blocks. The default implementation will maintain the current behavior of storing the data into BlockManager. The optional implementation will store the data to both BlockManager as well as a write ahead log.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#2940 from tdas/driver-ha-rbh and squashes the following commits:
78a4aaa [Tathagata Das] Fixed bug causing test failures.
f192f47 [Tathagata Das] Fixed import order.
df5f320 [Tathagata Das] Updated code to use ReceivedBlockStoreResult as the return type for handler's storeBlock
33c30c9 [Tathagata Das] Added license, and organized imports.
2f025b3 [Tathagata Das] Updates based on PR comments.
18aec1e [Tathagata Das] Moved ReceivedBlockInfo back into spark.streaming.scheduler package
95a4987 [Tathagata Das] Added ReceivedBlockHandler and its associated tests
If classes implementing Serializable or Externalizable interfaces throw
exceptions other than IOException or ClassNotFoundException from their
(de)serialization methods, then this results in an unhelpful
"IOException: unexpected exception type" rather than the actual exception that
produced the (de)serialization error.
This patch fixes this by adding a utility method that re-wraps any uncaught
exceptions in IOException (unless they are already instances of IOException).
Author: Josh Rosen <joshrosen@databricks.com>
Closes#2932 from JoshRosen/SPARK-4080 and squashes the following commits:
cd3a9be [Josh Rosen] [SPARK-4080] Only throw IOException from [write|read][Object|External].
As part of the effort to avoid data loss on Spark Streaming driver failure, we want to implement a write ahead log that can write received data to HDFS. This allows the received data to be persist across driver failures. So when the streaming driver is restarted, it can find and reprocess all the data that were received but not processed.
This was primarily implemented by @harishreedharan. This is still WIP, as he is going to improve the unitests by using HDFS mini cluster.
Author: Hari Shreedharan <hshreedharan@apache.org>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#2882 from tdas/driver-ha-wal and squashes the following commits:
e4bee20 [Tathagata Das] Removed synchronized, Path.getFileSystem is threadsafe
55514e2 [Tathagata Das] Minor changes based on PR comments.
d29fddd [Tathagata Das] Merge pull request #20 from harishreedharan/driver-ha-wal
a317a4d [Hari Shreedharan] Directory deletion should not fail tests
9514dc8 [Tathagata Das] Added unit tests to test reading of corrupted data and other minor edits
3881706 [Tathagata Das] Merge pull request #19 from harishreedharan/driver-ha-wal
4705fff [Hari Shreedharan] Sort listed files by name. Use local files for WAL tests.
eb356ca [Tathagata Das] Merge pull request #18 from harishreedharan/driver-ha-wal
82ce56e [Hari Shreedharan] Fix file ordering issue in WALManager tests
5ff90ee [Hari Shreedharan] Fix tests to not ignore ordering and also assert all data is present
ef8db09 [Tathagata Das] Merge pull request #17 from harishreedharan/driver-ha-wal
7e40e56 [Hari Shreedharan] Restore old build directory after tests
587b876 [Hari Shreedharan] Fix broken test. Call getFileSystem only from synchronized method.
b4be0c1 [Hari Shreedharan] Remove unused method
edcbee1 [Hari Shreedharan] Tests reading and writing data using writers now use Minicluster.
5c70d1f [Hari Shreedharan] Remove underlying stream from the WALWriter.
4ab602a [Tathagata Das] Refactored write ahead stuff from streaming.storage to streaming.util
b06be2b [Tathagata Das] Adding missing license.
5182ffb [Hari Shreedharan] Added documentation
172358d [Tathagata Das] Pulled WriteAheadLog-related stuff from tdas/spark/tree/driver-ha-working
Author: Holden Karau <holden@pigscanfly.ca>
Closes#2861 from holdenk/SPARK-4015-Documentation-in-the-streaming-context-references-non-existent-function and squashes the following commits:
081db8a [Holden Karau] fix pyspark streaming doc too
0e03863 [Holden Karau] replace awaitTransformation with awaitTermination
SparkEnv is cached in ThreadLocal object, so after stop and create a new SparkContext, old SparkEnv is still used by some threads, it will trigger many problems, for example, pyspark will have problem after restart SparkContext, because py4j use thread pool for RPC.
This patch will clear all the references after stop a SparkEnv.
cc mateiz tdas pwendell
Author: Davies Liu <davies.liu@gmail.com>
Closes#2624 from davies/env and squashes the following commits:
a69f30c [Davies Liu] deprecate getThreadLocal
ba77ca4 [Davies Liu] remove getThreadLocal(), update docs
ee62bb7 [Davies Liu] cleanup ThreadLocal of SparnENV
4d0ea8b [Davies Liu] clear reference of SparkEnv after stop
(HT @vanzin) Whatever the reason was for having this test class in `main`, if there is one, appear to be moot. This may have been a result of earlier streaming test reorganization.
This simply puts `MasterFailureTest` back under `test/`, removes some redundant copied code, and touches up a few tiny inspection warnings along the way.
Author: Sean Owen <sowen@cloudera.com>
Closes#2399 from srowen/SPARK-2932 and squashes the following commits:
3909411 [Sean Owen] Move MasterFailureTest to src/test, and remove redundant TestOutputStream
leftOuterJoin and rightOuterJoin are already implemented. This patch adds fullOuterJoin.
Author: Aaron Staple <aaron.staple@gmail.com>
Closes#1395 from staple/SPARK-546 and squashes the following commits:
1f5595c [Aaron Staple] Fix python style
7ac0aa9 [Aaron Staple] [SPARK-546] Add full outer join to RDD and DStream.
3b5d137 [Aaron Staple] In JavaPairDStream, make class tag specification in rightOuterJoin consistent with other functions.
31f2956 [Aaron Staple] Fix left outer join documentation comments.
This is a refactored version of the original PR https://github.com/apache/spark/pull/1723 my mubarak
Please take a look andrewor14, mubarak
Author: Mubarak Seyed <mubarak.seyed@gmail.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#2464 from tdas/streaming-callsite and squashes the following commits:
dc54c71 [Tathagata Das] Made changes based on PR comments.
390b45d [Tathagata Das] Fixed minor bugs.
904cd92 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-callsite
7baa427 [Tathagata Das] Refactored getCallSite and setCallSite to make it simpler. Also added unit test for DStream creation site.
b9ed945 [Mubarak Seyed] Adding streaming utils
c461cf4 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
ceb43da [Mubarak Seyed] Changing default regex function name
8c5d443 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
196121b [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
491a1eb [Mubarak Seyed] Removing streaming visibility from getRDDCreationCallSite in DStream
33a7295 [Mubarak Seyed] Fixing review comments: Merging both setCallSite methods
c26d933 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
f51fd9f [Mubarak Seyed] Fixing scalastyle, Regex for Utils.getCallSite, and changing method names in DStream
5051c58 [Mubarak Seyed] Getting return value of compute() into variable and call setCallSite(prevCallSite) only once. Adding return for other code paths (for None)
a207eb7 [Mubarak Seyed] Fixing code review comments
ccde038 [Mubarak Seyed] Removing Utils import from MappedDStream
2a09ad6 [Mubarak Seyed] Changes in Utils.scala for SPARK-1853
1d90cc3 [Mubarak Seyed] Changes for SPARK-1853
5f3105a [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
70f494f [Mubarak Seyed] Changes for SPARK-1853
1500deb [Mubarak Seyed] Changes in Spark Streaming UI
9d38d3c [Mubarak Seyed] [SPARK-1853] Show Streaming application code context (file, line number) in Spark Stages UI
d466d75 [Mubarak Seyed] Changes for spark streaming UI
tdas is this what you had in mind for this JIRA? I saw this one and thought it would be easy to take care of, and helpful as I use streaming from Java.
I could do the same for `Time`? Happy to do so.
Author: Sean Owen <sowen@cloudera.com>
Closes#2403 from srowen/SPARK-2745 and squashes the following commits:
5a9e706 [Sean Owen] Change "Duration" to "Durations" to avoid changing Duration case class API
bda301c [Sean Owen] Just delegate to Scala binary operator syntax to avoid scalastyle warning
7dde949 [Sean Owen] Disable scalastyle for false positives. Add Java static factory methods seconds(), minutes() to Duration. Add Java-friendly methods to Time too, and unit tests. Remove unnecessary math.floor from Time.floor()
4dee32e [Sean Owen] Add named methods to Duration in parallel to symbolic methods for Java-friendliness. Also add unit tests for Duration, in Scala and Java.
... that expose a stop() lifecycle method. This doesn't add `AutoCloseable`, which is Java 7+ only. But it should be possible to use try-with-resources on a `Closeable` in Java 7, as long as the `close()` does not throw a checked exception, and these don't. Q.E.D.
Author: Sean Owen <sowen@cloudera.com>
Closes#2346 from srowen/SPARK-3470 and squashes the following commits:
612c21d [Sean Owen] Add Closeable / close() to Java context objects that expose a stop() lifecycle method
We currently open many ephemeral ports during the tests, and as a result we occasionally can't bind to new ones. This has caused the `DriverSuite` and the `SparkSubmitSuite` to fail intermittently.
By disabling the `SparkUI` when it's not needed, we already cut down on the number of ports opened significantly, on the order of the number of `SparkContexts` ever created. We must keep it enabled for a few tests for the UI itself, however.
Author: Andrew Or <andrewor14@gmail.com>
Closes#2363 from andrewor14/disable-ui-for-tests and squashes the following commits:
332a7d5 [Andrew Or] No need to set spark.ui.port to 0 anymore
30c93a2 [Andrew Or] Simplify streaming UISuite
a431b84 [Andrew Or] Fix streaming test failures
8f5ae53 [Andrew Or] Fix no new line at the end
29c9b5b [Andrew Or] Disable SparkUI for tests
Comment of the storageLevel param of actorStream says that it defaults to memory-only while the default is MEMORY_AND_DISK_SER_2.
Author: Mario Pastorelli <pastorelli.mario@gmail.com>
Closes#2319 from melrief/master and squashes the following commits:
7b6ce68 [Mario Pastorelli] [Docs] actorStream storageLevel default is MEMORY_AND_DISK_SER_2
As of #1777 we log the name of the actor system when it binds to a port. The current name "spark" is super general and does not convey any meaning. For instance, the following line is taken from my driver log after setting `spark.driver.port` to 5001.
```
14/08/13 19:33:29 INFO Remoting: Remoting started; listening on addresses:
[akka.tcp://sparkandrews-mbp:5001]
14/08/13 19:33:29 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkandrews-mbp:5001]
14/08/06 13:40:05 INFO Utils: Successfully started service 'spark' on port 5001.
```
This commit renames this to "sparkDriver" and "sparkExecutor". The goal of this unambitious PR is simply to make the logged information more explicit without introducing any change in functionality.
Author: Andrew Or <andrewor14@gmail.com>
Closes#1810 from andrewor14/service-name and squashes the following commits:
8c459ed [Andrew Or] Use a common variable for driver/executor actor system names
3a92843 [Andrew Or] Change actor name to sparkDriver and sparkExecutor
921363e [Andrew Or] Merge branch 'master' of github.com:apache/spark into service-name
c8c6a62 [Andrew Or] Do not include hyphens in actor name
1c1b42e [Andrew Or] Avoid spaces in akka system name
f644b55 [Andrew Or] Use more specific service name
Instead of keeping copies in all pages, just reference the values
kept in the base SparkUI instance (by making them available via
getters).
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#1252 from vanzin/SPARK-2169 and squashes the following commits:
4412fc6 [Marcelo Vanzin] Simplify UIUtils.headerSparkPage signature.
4e5d35a [Marcelo Vanzin] [SPARK-2169] Don't copy appName / basePath everywhere.
Not supported in Scala 2.11. Split them into separate methods instead.
Author: Anand Avati <avati@redhat.com>
Closes#1704 from avati/SPARK-1812-default-args and squashes the following commits:
3e3924a [Anand Avati] SPARK-1812: Add Mima excludes for the broken ABI
901dfc7 [Anand Avati] SPARK-1812: core - Fix overloaded methods with default arguments
07f00af [Anand Avati] SPARK-1812: streaming - Fix overloaded methods with default arguments
- Added override.
- Marked some variables as private.
Author: Reynold Xin <rxin@apache.org>
Closes#1943 from rxin/metricsSource and squashes the following commits:
fbfa943 [Reynold Xin] Minor cleanup of metrics.Source. - Added override. - Marked some variables as private.
When standalone Workers launch executors, they inherit the Spark home set by the driver. This means if the worker machines do not share the same directory structure as the driver node, the Workers will attempt to run scripts (e.g. bin/compute-classpath.sh) that do not exist locally and fail. This is a common scenario if the driver is launched from outside of the cluster.
The solution is to simply not pass the driver's Spark home to the Workers. This PR further makes an attempt to avoid overloading the usages of `spark.home`, which is now only used for setting executor Spark home on Mesos and in python.
This is based on top of #1392 and originally reported by YanTangZhai. Tested on standalone cluster.
Author: Andrew Or <andrewor14@gmail.com>
Closes#1734 from andrewor14/spark-home-reprise and squashes the following commits:
f71f391 [Andrew Or] Revert changes in python
1c2532c [Andrew Or] Merge branch 'master' of github.com:apache/spark into spark-home-reprise
188fc5d [Andrew Or] Avoid using spark.home where possible
09272b7 [Andrew Or] Always use Worker's working directory as spark home
Author: joyyoj <sunshch@gmail.com>
Closes#1694 from joyyoj/SPARK-2379 and squashes the following commits:
d73790d [joyyoj] SPARK-2379 Fix the bug that streaming's receiver may fall into a dead loop
22e7821 [joyyoj] Merge remote-tracking branch 'apache/master'
3f4a602 [joyyoj] Merge remote-tracking branch 'remotes/apache/master'
f4660c5 [joyyoj] [SPARK-1998] SparkFlumeEvent with body bigger than 1020 bytes are not read properly
**Problem.** When caching, we currently unroll the entire RDD partition before making sure we have enough free memory. This is a common cause for OOMs especially when (1) the BlockManager has little free space left in memory, and (2) the partition is large.
**Solution.** We maintain a global memory pool of `M` bytes shared across all threads, similar to the way we currently manage memory for shuffle aggregation. Then, while we unroll each partition, periodically check if there is enough space to continue. If not, drop enough RDD blocks to ensure we have at least `M` bytes to work with, then try again. If we still don't have enough space to unroll the partition, give up and drop the block to disk directly if applicable.
**New configurations.**
- `spark.storage.bufferFraction` - the value of `M` as a fraction of the storage memory. (default: 0.2)
- `spark.storage.safetyFraction` - a margin of safety in case size estimation is slightly off. This is the equivalent of the existing `spark.shuffle.safetyFraction`. (default 0.9)
For more detail, see the [design document](https://issues.apache.org/jira/secure/attachment/12651793/spark-1777-design-doc.pdf). Tests pending for performance and memory usage patterns.
Author: Andrew Or <andrewor14@gmail.com>
Closes#1165 from andrewor14/them-rdd-memories and squashes the following commits:
e77f451 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
c7c8832 [Andrew Or] Simplify logic + update a few comments
269d07b [Andrew Or] Very minor changes to tests
6645a8a [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
b7e165c [Andrew Or] Add new tests for unrolling blocks
f12916d [Andrew Or] Slightly clean up tests
71672a7 [Andrew Or] Update unrollSafely tests
369ad07 [Andrew Or] Correct ensureFreeSpace and requestMemory behavior
f4d035c [Andrew Or] Allow one thread to unroll multiple blocks
a66fbd2 [Andrew Or] Rename a few things + update comments
68730b3 [Andrew Or] Fix weird scalatest behavior
e40c60d [Andrew Or] Fix MIMA excludes
ff77aa1 [Andrew Or] Fix tests
1a43c06 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
b9a6eee [Andrew Or] Simplify locking behavior on unrollMemoryMap
ed6cda4 [Andrew Or] Formatting fix (super minor)
f9ff82e [Andrew Or] putValues -> putIterator + putArray
beb368f [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
8448c9b [Andrew Or] Fix tests
a49ba4d [Andrew Or] Do not expose unroll memory check period
69bc0a5 [Andrew Or] Always synchronize on putLock before unrollMemoryMap
3f5a083 [Andrew Or] Simplify signature of ensureFreeSpace
dce55c8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
8288228 [Andrew Or] Synchronize put and unroll properly
4f18a3d [Andrew Or] bufferFraction -> unrollFraction
28edfa3 [Andrew Or] Update a few comments / log messages
728323b [Andrew Or] Do not synchronize every 1000 elements
5ab2329 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
129c441 [Andrew Or] Fix bug: Use toArray rather than array
9a65245 [Andrew Or] Update a few comments + minor control flow changes
57f8d85 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
abeae4f [Andrew Or] Add comment clarifying the MEMORY_AND_DISK case
3dd96aa [Andrew Or] AppendOnlyBuffer -> Vector (+ a few small changes)
f920531 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
0871835 [Andrew Or] Add an effective storage level interface to BlockManager
64e7d4c [Andrew Or] Add/modify a few comments (minor)
8af2f35 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
4f4834e [Andrew Or] Use original storage level for blocks dropped to disk
ecc8c2d [Andrew Or] Fix binary incompatibility
24185ea [Andrew Or] Avoid dropping a block back to disk if reading from disk
2b7ee66 [Andrew Or] Fix bug in SizeTracking*
9b9a273 [Andrew Or] Fix tests
20eb3e5 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
649bdb3 [Andrew Or] Document spark.storage.bufferFraction
a10b0e7 [Andrew Or] Add initial memory request threshold + rename a few things
e9c3cb0 [Andrew Or] cacheMemoryMap -> unrollMemoryMap
198e374 [Andrew Or] Unfold -> unroll
0d50155 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
d9d02a8 [Andrew Or] Remove unused param in unfoldSafely
ec728d8 [Andrew Or] Add tests for safe unfolding of blocks
22b2209 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
078eb83 [Andrew Or] Add check for hasNext in PrimitiveVector.iterator
0871535 [Andrew Or] Fix tests in BlockManagerSuite
d68f31e [Andrew Or] Safely unfold blocks for all memory puts
5961f50 [Andrew Or] Fix tests
195abd7 [Andrew Or] Refactor: move unfold logic to MemoryStore
1e82d00 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
3ce413e [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
d5dd3b4 [Andrew Or] Free buffer memory in finally
ea02eec [Andrew Or] Fix tests
b8e1d9c [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
a8704c1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
e1b8b25 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
87aa75c [Andrew Or] Fix mima excludes again (typo)
11eb921 [Andrew Or] Clarify comment (minor)
50cae44 [Andrew Or] Remove now duplicate mima exclude
7de5ef9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
df47265 [Andrew Or] Fix binary incompatibility
6d05a81 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
f94f5af [Andrew Or] Update a few comments (minor)
776aec9 [Andrew Or] Prevent OOM if a single RDD partition is too large
bbd3eea [Andrew Or] Fix CacheManagerSuite to use Array
97ea499 [Andrew Or] Change BlockManager interface to use Arrays
c12f093 [Andrew Or] Add SizeTrackingAppendOnlyBuffer and tests
Our program needs to receive a large amount of data and run for a long
time.
We set the log level to WARN but "Storing iterator" "received single"
as such message written to the log file. (over yarn)
Author: fireflyc <fireflyc@126.com>
Closes#1372 from fireflyc/fix-replace-stdout-log and squashes the following commits:
e684140 [fireflyc] 'info' modified into the 'debug'
fa22a38 [fireflyc] replace println to log4j
JIRA: https://issues.apache.org/jira/browse/SPARK-2657
Our current code uses ArrayBuffers for each group of values in groupBy, as well as for the key's elements in CoGroupedRDD. ArrayBuffers have a lot of overhead if there are few values in them, which is likely to happen in cases such as join. In particular, they have a pointer to an Object[] of size 16 by default, which is 24 bytes for the array header + 128 for the pointers in there, plus at least 32 for the ArrayBuffer data structure. This patch replaces the per-group buffers with a CompactBuffer class that can store up to 2 elements more efficiently (in fields of itself) and acts like an ArrayBuffer beyond that. For a key's elements in CoGroupedRDD, we use an Array of CompactBuffers instead of an ArrayBuffer of ArrayBuffers.
There are some changes throughout the code to deal with CoGroupedRDD returning Array instead. We can also decide not to do that but CoGroupedRDD is a `DeveloperAPI` so I think it's okay to change it here.
Author: Matei Zaharia <matei@databricks.com>
Closes#1555 from mateiz/compact-groupby and squashes the following commits:
845a356 [Matei Zaharia] Lower initial size of CompactBuffer's vector to 8
07621a7 [Matei Zaharia] Review comments
0c1cd12 [Matei Zaharia] Don't use varargs in CompactBuffer.apply
bdc8a39 [Matei Zaharia] Small tweak to +=, and typos
f61f040 [Matei Zaharia] Fix line lengths
59da88b0 [Matei Zaharia] Fix line lengths
197cde8 [Matei Zaharia] Make CompactBuffer extend Seq to make its toSeq more efficient
775110f [Matei Zaharia] Change CoGroupedRDD to give (K, Array[Iterable[_]]) to avoid wrappers
9b4c6e8 [Matei Zaharia] Use CompactBuffer in CoGroupedRDD
ed577ab [Matei Zaharia] Use CompactBuffer in groupByKey
10f0de1 [Matei Zaharia] A CompactBuffer that's more memory-efficient than ArrayBuffer for small buffers
explicit return type for implicit function
Author: James Z.M. Gao <gaozhm@mediav.com>
Closes#153 from gzm55/work/streaming-compile and squashes the following commits:
11e9c8d [James Z.M. Gao] fix style error
fe88109 [James Z.M. Gao] fix compile error of streaming project