Commit graph

836 commits

Author SHA1 Message Date
zsxwing 50e4634236 [SPARK-10769] [STREAMING] [TESTS] Fix o.a.s.streaming.CheckpointSuite.maintains rate controller
Fixed the following failure in https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1787/testReport/junit/org.apache.spark.streaming/CheckpointSuite/recovery_maintains_rate_controller/
```
sbt.ForkMain$ForkError: The code passed to eventually never returned normally. Attempted 660 times over 10.000044392000001 seconds. Last failure message: 9223372036854775807 did not equal 200.
	at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply$mcV$sp(CheckpointSuite.scala:413)
	at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
	at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
```

In this test, it calls `advanceTimeWithRealDelay(ssc, 2)` to run two batch jobs. However, one race condition is these two jobs can finish before the receiver is registered. Then `UpdateRateLimit` won't be sent to the receiver and `getDefaultBlockGeneratorRateLimit` cannot be updated.

Here are the logs related to this issue:
```
15/09/22 19:28:26.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock before advancing = 2500

15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Finished job streaming job 3000 ms.0 from job set of time 3000 ms
15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Total delay: 1442975303.869 s for time 3000 ms (execution: 0.711 s)

15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Finished job streaming job 3500 ms.0 from job set of time 3500 ms
15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Total delay: 1442975303.373 s for time 3500 ms (execution: 0.004 s)

15/09/22 19:28:26.879 sparkDriver-akka.actor.default-dispatcher-3 INFO ReceiverTracker: Registered receiver for stream 0 from localhost:57749

15/09/22 19:28:27.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock after advancing = 3500
```
`advanceTimeWithRealDelay(ssc, 2)` triggered job 3000ms and 3500ms but the receiver was registered after job 3000ms and 3500ms finished.

So we should make sure the receiver online before running `advanceTimeWithRealDelay(ssc, 2)`.

Author: zsxwing <zsxwing@gmail.com>

Closes #8877 from zsxwing/SPARK-10769.
2015-09-23 01:29:30 -07:00
zsxwing 44c28abf12 [SPARK-10224] [STREAMING] Fix the issue that blockIntervalTimer won't call updateCurrentBuffer when stopping
`blockIntervalTimer.stop(interruptTimer = false)` doesn't guarantee calling `updateCurrentBuffer`. So it's possible that `blockIntervalTimer` will exit when `updateCurrentBuffer` is not empty. Then the data in `currentBuffer` will be lost.

To reproduce it, you can add `Thread.sleep(200)` in this line (69c9c17716/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala (L100)) and run `StreamingContexSuite`.
I cannot write a unit test to reproduce it because I cannot find an approach to force `RecurringTimer` suspend at this line for a few milliseconds.

There was a failure in Jenkins here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41455/console

This PR updates RecurringTimer to make sure `stop(interruptTimer = false)` will call `callback` at least once after the `stop` method is called.

Author: zsxwing <zsxwing@gmail.com>

Closes #8417 from zsxwing/SPARK-10224.
2015-09-23 01:28:02 -07:00
Tathagata Das 5548a25475 [SPARK-10652] [SPARK-10742] [STREAMING] Set meaningful job descriptions for all streaming jobs
Here is the screenshot after adding the job descriptions to threads that run receivers and the scheduler thread running the batch jobs.

## All jobs page
* Added job descriptions with links to relevant batch details page
![image](https://cloud.githubusercontent.com/assets/663212/9924165/cda4a372-5cb1-11e5-91ca-d43a32c699e9.png)

## All stages page
* Added stage descriptions with links to relevant batch details page
![image](https://cloud.githubusercontent.com/assets/663212/9923814/2cce266a-5cae-11e5-8a3f-dad84d06c50e.png)

## Streaming batch details page
* Added the +details link
![image](https://cloud.githubusercontent.com/assets/663212/9921977/24014a32-5c98-11e5-958e-457b6c38065b.png)

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #8791 from tdas/SPARK-10652.
2015-09-22 22:44:09 -07:00
Tathagata Das 72869883f1 [SPARK-10649] [STREAMING] Prevent inheriting job group and irrelevant job description in streaming jobs
The job group, and job descriptions information is passed through thread local properties, and get inherited by child threads. In case of spark streaming, the streaming jobs inherit these properties from the thread that called streamingContext.start(). This may not make sense.

1. Job group: This is mainly used for cancelling a group of jobs together. It does not make sense to cancel streaming jobs like this, as the effect will be unpredictable. And its not a valid usecase any way, to cancel a streaming context, call streamingContext.stop()

2. Job description: This is used to pass on nice text descriptions for jobs to show up in the UI. The job description of the thread that calls streamingContext.start() is not useful for all the streaming jobs, as it does not make sense for all of the streaming jobs to have the same description, and the description may or may not be related to streaming.

The solution in this PR is meant for the Spark master branch, where local properties are inherited by cloning the properties. The job group and job description in the thread that starts the streaming scheduler are explicitly removed, so that all the subsequent child threads does not inherit them. Also, the starting is done in a new child thread, so that setting the job group and description for streaming, does not change those properties in the thread that called streamingContext.start().

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #8781 from tdas/SPARK-10649.
2015-09-21 16:47:52 -07:00
Marcelo Vanzin b42059d2ef Revert "[SPARK-10300] [BUILD] [TESTS] Add support for test tags in run-tests.py."
This reverts commit 8abef21dac.
2015-09-15 13:03:38 -07:00
Marcelo Vanzin 8abef21dac [SPARK-10300] [BUILD] [TESTS] Add support for test tags in run-tests.py.
This change does two things:

- tag a few tests and adds the mechanism in the build to be able to disable those tags,
  both in maven and sbt, for both junit and scalatest suites.
- add some logic to run-tests.py to disable some tags depending on what files have
  changed; that's used to disable expensive tests when a module hasn't explicitly
  been changed, to speed up testing for changes that don't directly affect those
  modules.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #8437 from vanzin/test-tags.
2015-09-15 10:45:02 -07:00
Reynold Xin 09b7e7c198 Update version to 1.6.0-SNAPSHOT.
Author: Reynold Xin <rxin@databricks.com>

Closes #8350 from rxin/1.6.
2015-09-15 00:54:20 -07:00
Sean Owen 4e2242bb41 [SPARK-10576] [BUILD] Move .java files out of src/main/scala
Move .java files in `src/main/scala` to `src/main/java` root, except for `package-info.java` (to stay next to package.scala)

Author: Sean Owen <sowen@cloudera.com>

Closes #8736 from srowen/SPARK-10576.
2015-09-14 15:03:51 -07:00
Sean Owen 22730ad54d [SPARK-10547] [TEST] Streamline / improve style of Java API tests
Fix a few Java API test style issues: unused generic types, exceptions, wrong assert argument order

Author: Sean Owen <sowen@cloudera.com>

Closes #8706 from srowen/SPARK-10547.
2015-09-12 10:40:10 +01:00
Luc Bourlier c1bc4f439f [SPARK-10227] fatal warnings with sbt on Scala 2.11
The bulk of the changes are on `transient` annotation on class parameter. Often the compiler doesn't generate a field for this parameters, so the the transient annotation would be unnecessary.
But if the class parameter are used in methods, then fields are created. So it is safer to keep the annotations.

The remainder are some potential bugs, and deprecated syntax.

Author: Luc Bourlier <luc.bourlier@typesafe.com>

Closes #8433 from skyluc/issue/sbt-2.11.
2015-09-09 09:57:58 +01:00
zsxwing 820913f554 [SPARK-10071] [STREAMING] Output a warning when writing QueueInputDStream and throw a better exception when reading QueueInputDStream
Output a warning when serializing QueueInputDStream rather than throwing an exception to allow unit tests use it. Moreover, this PR also throws an better exception when deserializing QueueInputDStream to make the user find out the problem easily. The previous exception is hard to understand: https://issues.apache.org/jira/browse/SPARK-8553

Author: zsxwing <zsxwing@gmail.com>

Closes #8624 from zsxwing/SPARK-10071 and squashes the following commits:

847cfa8 [zsxwing] Output a warning when writing QueueInputDStream and throw a better exception when reading QueueInputDStream
2015-09-08 20:39:15 -07:00
Reynold Xin 5ffe752b59 [SPARK-9767] Remove ConnectionManager.
We introduced the Netty network module for shuffle in Spark 1.2, and has turned it on by default for 3 releases. The old ConnectionManager is difficult to maintain. If we merge the patch now, by the time it is released, it would be 1 yr for which ConnectionManager is off by default. It's time to remove it.

Author: Reynold Xin <rxin@databricks.com>

Closes #8161 from rxin/SPARK-9767.
2015-09-07 10:42:30 -10:00
xutingjun eafe37236c [SPARK-10311] [STREAMING] Reload appId and attemptId when app starts with checkpoint file in cluster mode
Author: xutingjun <xutingjun@huawei.com>

Closes #8477 from XuTingjun/streaming-attempt.
2015-09-04 15:40:02 -07:00
robbins 754f853b02 [SPARK-9869] [STREAMING] Wait for all event notifications before asserting results
Author: robbins <robbins@uk.ibm.com>

Closes #8589 from robbinspg/InputStreamSuite-fix.
2015-09-03 13:48:35 -07:00
zsxwing 4a5fe09165 [SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when deregisterReceivering since we may reuse it later
`deregisterReceiver` should not remove `ReceiverTrackingInfo`. Otherwise, it will throw `java.util.NoSuchElementException: key not found` when restarting it.

Author: zsxwing <zsxwing@gmail.com>

Closes #8538 from zsxwing/SPARK-10369.
2015-08-31 12:19:11 -07:00
Sean Owen 69c9c17716 [SPARK-9613] [CORE] Ban use of JavaConversions and migrate all existing uses to JavaConverters
Replace `JavaConversions` implicits with `JavaConverters`

Most occurrences I've seen so far are necessary conversions; a few have been avoidable. None are in critical code as far as I see, yet.

Author: Sean Owen <sowen@cloudera.com>

Closes #8033 from srowen/SPARK-9613.
2015-08-25 12:33:13 +01:00
Tathagata Das 1fc37581a5 [SPARK-10210] [STREAMING] Filter out non-existent blocks before creating BlockRDD
When write ahead log is not enabled, a recovered streaming driver still tries to run jobs using pre-failure block ids, and fails as the block do not exists in-memory any more (and cannot be recovered as receiver WAL is not enabled).

This occurs because the driver-side WAL of ReceivedBlockTracker is recovers that past block information, and ReceiveInputDStream creates BlockRDDs even if those blocks do not exist.

The solution in this PR is to filter out block ids that do not exist before creating the BlockRDD. In addition, it adds unit tests to verify other logic in ReceiverInputDStream.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #8405 from tdas/SPARK-10210.
2015-08-25 00:35:51 -07:00
zsxwing f023aa2fcc [SPARK-10137] [STREAMING] Avoid to restart receivers if scheduleReceivers returns balanced results
This PR fixes the following cases for `ReceiverSchedulingPolicy`.

1) Assume there are 4 executors: host1, host2, host3, host4, and 5 receivers: r1, r2, r3, r4, r5. Then `ReceiverSchedulingPolicy.scheduleReceivers` will return (r1 -> host1, r2 -> host2, r3 -> host3, r4 -> host4, r5 -> host1).
Let's assume r1 starts at first on `host1` as `scheduleReceivers` suggested,  and try to register with ReceiverTracker. But the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will return (host2, host3, host4) according to the current executor weights (host1 -> 1.0, host2 -> 0.5, host3 -> 0.5, host4 -> 0.5), so ReceiverTracker will reject `r1`. This is unexpected since r1 is starting exactly where `scheduleReceivers` suggested.

This case can be fixed by ignoring the information of the receiver that is rescheduling in `receiverTrackingInfoMap`.

2) Assume there are 3 executors (host1, host2, host3) and each executors has 3 cores, and 3 receivers: r1, r2, r3. Assume r1 is running on host1. Now r2 is restarting, the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will always return (host1, host2, host3). So it's possible that r2 will be scheduled to host1 by TaskScheduler. r3 is similar. Then at last, it's possible that there are 3 receivers running on host1, while host2 and host3 are idle.

This issue can be fixed by returning only executors that have the minimum wight rather than returning at least 3 executors.

Author: zsxwing <zsxwing@gmail.com>

Closes #8340 from zsxwing/fix-receiver-scheduling.
2015-08-24 23:34:50 -07:00
Tathagata Das 7478c8b66d [SPARK-9791] [PACKAGE] Change private class to private class to prevent unnecessary classes from showing up in the docs
In addition, some random cleanup of import ordering

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #8387 from tdas/SPARK-9791 and squashes the following commits:

67f3ee9 [Tathagata Das] Change private class to private[package] class to prevent them from showing up in the docs
2015-08-24 12:40:09 -07:00
Tathagata Das 053d94fcf3 [SPARK-10142] [STREAMING] Made python checkpoint recovery handle non-local checkpoint paths and existing SparkContexts
The current code only checks checkpoint files in local filesystem, and always tries to create a new Python SparkContext (even if one already exists). The solution is to do the following:
1. Use the same code path as Java to check whether a valid checkpoint exists
2. Create a new Python SparkContext only if there no active one.

There is not test for the path as its hard to test with distributed filesystem paths in a local unit test. I am going to test it with a distributed file system manually to verify that this patch works.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #8366 from tdas/SPARK-10142 and squashes the following commits:

3afa666 [Tathagata Das] Added tests
2dd4ae5 [Tathagata Das] Added the check to not create a context if one already exists
9bf151b [Tathagata Das] Made python checkpoint recovery use java to find the checkpoint files
2015-08-23 19:24:32 -07:00
zsxwing c6df5f66d9 [SPARK-10148] [STREAMING] Display active and inactive receiver numbers in Streaming page
Added the active and inactive receiver numbers in the summary section of Streaming page.

<img width="1074" alt="screen shot 2015-08-21 at 2 08 54 pm" src="https://cloud.githubusercontent.com/assets/1000778/9402437/ff2806a2-480f-11e5-8f8e-efdf8e5d514d.png">

Author: zsxwing <zsxwing@gmail.com>

Closes #8351 from zsxwing/receiver-number.
2015-08-23 17:41:49 -07:00
Tathagata Das b762f9920f [SPARK-10128] [STREAMING] Used correct classloader to deserialize WAL data
Recovering Kinesis sequence numbers from WAL leads to classnotfoundexception because the ObjectInputStream does not use the correct classloader and the SequenceNumberRanges class (in streaming-kinesis-asl package) cannot be found (added through spark-submit) while deserializing. The solution is to use `Thread.currentThread().getContextClassLoader` while deserializing.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #8328 from tdas/SPARK-10128 and squashes the following commits:

f19b1c2 [Tathagata Das] Used correct classloader to deserialize WAL data
2015-08-19 21:15:58 -07:00
zsxwing affc8a887e [SPARK-10125] [STREAMING] Fix a potential deadlock in JobGenerator.stop
Because `lazy val` uses `this` lock, if JobGenerator.stop and JobGenerator.doCheckpoint (JobGenerator.shouldCheckpoint has not yet been initialized) run at the same time, it may hang.

Here are the stack traces for the deadlock:

```Java
"pool-1-thread-1-ScalaTest-running-StreamingListenerSuite" #11 prio=5 os_prio=31 tid=0x00007fd35d094800 nid=0x5703 in Object.wait() [0x000000012ecaf000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.Thread.join(Thread.java:1245)
        - locked <0x00000007b5d8d7f8> (a org.apache.spark.util.EventLoop$$anon$1)
        at java.lang.Thread.join(Thread.java:1319)
        at org.apache.spark.util.EventLoop.stop(EventLoop.scala:81)
        at org.apache.spark.streaming.scheduler.JobGenerator.stop(JobGenerator.scala:155)
        - locked <0x00000007b5d8cea0> (a org.apache.spark.streaming.scheduler.JobGenerator)
        at org.apache.spark.streaming.scheduler.JobScheduler.stop(JobScheduler.scala:95)
        - locked <0x00000007b5d8ced8> (a org.apache.spark.streaming.scheduler.JobScheduler)
        at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:687)

"JobGenerator" #67 daemon prio=5 os_prio=31 tid=0x00007fd35c3b9800 nid=0x9f03 waiting for monitor entry [0x0000000139e4a000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.spark.streaming.scheduler.JobGenerator.shouldCheckpoint$lzycompute(JobGenerator.scala:63)
        - waiting to lock <0x00000007b5d8cea0> (a org.apache.spark.streaming.scheduler.JobGenerator)
        at org.apache.spark.streaming.scheduler.JobGenerator.shouldCheckpoint(JobGenerator.scala:63)
        at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:290)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
```

I can use this patch to produce this deadlock: 8a88f28d13

And a timeout build in Jenkins due to this deadlock: https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1654/

This PR initializes `checkpointWriter` before `eventLoop` uses it to avoid this deadlock.

Author: zsxwing <zsxwing@gmail.com>

Closes #8326 from zsxwing/SPARK-10125.
2015-08-19 19:43:09 -07:00
Tathagata Das bc9a0e0323 [SPARK-9967] [SPARK-10099] [STREAMING] Renamed conf spark.streaming.backpressure.{enable-->enabled} and fixed deprecated annotations
Small changes
- Renamed conf spark.streaming.backpressure.{enable --> enabled}
- Change Java Deprecated annotations to Scala deprecated annotation with more information.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #8299 from tdas/SPARK-9967.
2015-08-18 23:37:57 -07:00
zsxwing 90273eff96 [SPARK-10102] [STREAMING] Fix a race condition that startReceiver may happen before setting trackerState to Started
Test failure: https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-Maven-with-YARN/HADOOP_PROFILE=hadoop-2.4,label=spark-test/3305/testReport/junit/org.apache.spark.streaming/StreamingContextSuite/stop_gracefully/

There is a race condition that setting `trackerState` to `Started` could happen after calling `startReceiver`. Then `startReceiver` won't start the receivers because it uses `! isTrackerStarted` to check if ReceiverTracker is stopping or stopped. But actually, `trackerState` is `Initialized` and will be changed to `Started` soon.

Therefore, we should use `isTrackerStopping || isTrackerStopped`.

Author: zsxwing <zsxwing@gmail.com>

Closes #8294 from zsxwing/SPARK-9504.
2015-08-18 20:15:54 -07:00
Tathagata Das 1aeae05bb2 [SPARK-10072] [STREAMING] BlockGenerator can deadlock when the queue of generate blocks fills up to capacity
Generated blocks are inserted into an ArrayBlockingQueue, and another thread pulls stuff from the ArrayBlockingQueue and pushes it into BlockManager. Now if that queue fills up to capacity (default is 10 blocks), then the inserting into queue (done in the function updateCurrentBuffer) get blocked inside a synchronized block. However, the thread that is pulling blocks from the queue uses the same lock to check the current (active or stopped) while pulling from the queue. Since the block generating threads is blocked (as the queue is full) on the lock, this thread that is supposed to drain the queue gets blocked. Ergo, deadlock.

Solution: Moved blocking call to ArrayBlockingQueue outside the synchronized to prevent deadlock.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #8257 from tdas/SPARK-10072.
2015-08-18 19:26:38 -07:00
Tathagata Das 9108eff74a [SPARK-10098] [STREAMING] [TEST] Cleanup active context after test in FailureSuite
Failures in streaming.FailureSuite can leak StreamingContext and SparkContext which fails all subsequent tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #8289 from tdas/SPARK-10098.
2015-08-18 17:00:13 -07:00
Davies Liu 37586e5449 [HOTFIX] fix duplicated braces
Author: Davies Liu <davies@databricks.com>

Closes #8219 from davies/fix_typo.
2015-08-14 20:56:55 -07:00
Tathagata Das 18a761ef7a [SPARK-9968] [STREAMING] Reduced time spent within synchronized block to prevent lock starvation
When the rate limiter is actually limiting the rate at which data is inserted into the buffer, the synchronized block of BlockGenerator.addData stays blocked for long time. This causes the thread switching the buffer and generating blocks (synchronized with addData) to starve and not generate blocks for seconds. The correct solution is to not block on the rate limiter within the synchronized block for adding data to the buffer.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #8204 from tdas/SPARK-9968 and squashes the following commits:

8cbcc1b [Tathagata Das] Removed unused val
a73b645 [Tathagata Das] Reduced time spent within synchronized block
2015-08-14 15:54:14 -07:00
Tathagata Das f3bfb711c1 [SPARK-9966] [STREAMING] Handle couple of corner cases in PIDRateEstimator
1. The rate estimator should not estimate any rate when there are no records in the batch, as there is no data to estimate the rate. In the current state, it estimates and set the rate to zero. That is incorrect.

2. The rate estimator should not never set the rate to zero under any circumstances. Otherwise the system will stop receiving data, and stop generating useful estimates (see reason 1). So the fix is to define a parameters that sets a lower bound on the estimated rate, so that the system always receives some data.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #8199 from tdas/SPARK-9966 and squashes the following commits:

829f793 [Tathagata Das] Fixed unit test and added comments
3a994db [Tathagata Das] Added min rate and updated tests in PIDRateEstimator
2015-08-14 15:10:01 -07:00
Michel Lemay ab7e721cfe [SPARK-9826] [CORE] Fix cannot use custom classes in log4j.properties
Refactor Utils class and create ShutdownHookManager.

NOTE: Wasn't able to run /dev/run-tests on windows machine.
Manual tests were conducted locally using custom log4j.properties file with Redis appender and logstash formatter (bundled in the fat-jar submitted to spark)

ex:
log4j.rootCategory=WARN,console,redis
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.spark.graphx.Pregel=INFO

log4j.appender.redis=com.ryantenney.log4j.FailoverRedisAppender
log4j.appender.redis.endpoints=hostname:port
log4j.appender.redis.key=mykey
log4j.appender.redis.alwaysBatch=false
log4j.appender.redis.layout=net.logstash.log4j.JSONEventLayoutV1

Author: michellemay <mlemay@gmail.com>

Closes #8109 from michellemay/SPARK-9826.
2015-08-12 16:41:35 -07:00
Hao Zhu 3c9802d940 [SPARK-9801] [STREAMING] Check if file exists before deleting temporary files.
Spark streaming deletes the temp file and backup files without checking if they exist or not

Author: Hao Zhu <viadeazhu@gmail.com>

Closes #8082 from viadea/master and squashes the following commits:

242d05f [Hao Zhu] [SPARK-9801][Streaming]No need to check the existence of those files
fd143f2 [Hao Zhu] [SPARK-9801][Streaming]Check if backupFile exists before deleting backupFile files.
087daf0 [Hao Zhu] SPARK-9801
2015-08-10 17:17:22 -07:00
Reynold Xin a863348fd8 Disable JobGeneratorSuite "Do not clear received block data too soon". 2015-08-09 13:43:31 -07:00
zsxwing 346209097e [SPARK-9639] [STREAMING] Fix a potential NPE in Streaming JobScheduler
Because `JobScheduler.stop(false)` may set `eventLoop` to null when `JobHandler` is running, then it's possible that when `post` is called, `eventLoop` happens to null.

This PR fixed this bug and also set threads in `jobExecutor` to `daemon`.

Author: zsxwing <zsxwing@gmail.com>

Closes #7960 from zsxwing/fix-npe and squashes the following commits:

b0864c4 [zsxwing] Fix a potential NPE in Streaming JobScheduler
2015-08-06 14:39:36 -07:00
Tathagata Das 0a078303d0 [SPARK-9556] [SPARK-9619] [SPARK-9624] [STREAMING] Make BlockGenerator more robust and make all BlockGenerators subscribe to rate limit updates
In some receivers, instead of using the default `BlockGenerator` in `ReceiverSupervisorImpl`, custom generator with their custom listeners are used for reliability (see [`ReliableKafkaReceiver`](https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala#L99) and [updated `KinesisReceiver`](https://github.com/apache/spark/pull/7825/files)). These custom generators do not receive rate updates. This PR modifies the code to allow custom `BlockGenerator`s to be created through the `ReceiverSupervisorImpl` so that they can be kept track and rate updates can be applied.

In the process, I did some simplification, and de-flaki-fication of some rate controller related tests. In particular.
- Renamed `Receiver.executor` to `Receiver.supervisor` (to match `ReceiverSupervisor`)
- Made `RateControllerSuite` faster (by increasing batch interval) and less flaky
- Changed a few internal API to return the current rate of block generators as Long instead of Option\[Long\] (was inconsistent at places).
- Updated existing `ReceiverTrackerSuite` to test that custom block generators get rate updates as well.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #7913 from tdas/SPARK-9556 and squashes the following commits:

41d4461 [Tathagata Das] fix scala style
eb9fd59 [Tathagata Das] Updated kinesis receiver
d24994d [Tathagata Das] Updated BlockGeneratorSuite to use manual clock in BlockGenerator
d70608b [Tathagata Das] Updated BlockGenerator with states and proper synchronization
f6bd47e [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-9556
31da173 [Tathagata Das] Fix bug
12116df [Tathagata Das] Add BlockGeneratorSuite
74bd069 [Tathagata Das] Fix style
989bb5c [Tathagata Das] Made BlockGenerator fail is used after stop, and added better unit tests for it
3ff618c [Tathagata Das] Fix test
b40eff8 [Tathagata Das] slight refactoring
f0df0f1 [Tathagata Das] Scala style fixes
51759cb [Tathagata Das] Refactored rate controller tests and added the ability to update rate of any custom block generator
2015-08-06 14:35:30 -07:00
Tathagata Das c2a71f0714 [SPARK-9217] [STREAMING] Make the kinesis receiver reliable by recording sequence numbers
This PR is the second one in the larger issue of making the Kinesis integration reliable and provide WAL-free at-least once guarantee. It is based on the design doc - https://docs.google.com/document/d/1k0dl270EnK7uExrsCE7jYw7PYx0YC935uBcxn3p0f58/edit

In this PR, I have updated the Kinesis Receiver to do the following.
- Control the block generation, by creating its own BlockGenerator with own callback methods and using it to keep track of the ranges of sequence numbers that go into each block.
- More specifically, as the KinesisRecordProcessor provides small batches of records, the records are atomically inserted into the block (that is, either the whole batch is in the block, or not). Accordingly the sequence number range of the batch is recorded. Since there may be many batches added to a block, the receiver tracks all the range of sequence numbers that is added to a block.
- When the block is ready to be pushed, the block is pushed and the ranges are reported as metadata of the block. In addition, the ranges are used to find out the latest sequence number for each shard that can be checkpointed through the DynamoDB.
- Periodically, each KinesisRecordProcessor checkpoints the latest successfully stored sequence number for it own shard.
- The array of ranges in the block metadata is used to create KinesisBackedBlockRDDs. The ReceiverInputDStream has been slightly refactored to allow the creation of KinesisBackedBlockRDDs instead of the WALBackedBlockRDDs.

Things to be done
- [x] Add new test to verify that the sequence numbers are recovered.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #7825 from tdas/kinesis-receiver and squashes the following commits:

2159be9 [Tathagata Das] Fixed bug
569be83 [Tathagata Das] Fix scala style issue
bf31e22 [Tathagata Das] Added more documentation to make the kinesis test endpoint more configurable
3ad8361 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into kinesis-receiver
c693a63 [Tathagata Das] Removed unnecessary constructor params from KinesisTestUtils
e1f1d0a [Tathagata Das] Addressed PR comments
b9fa6bf [Tathagata Das] Fix serialization issues
f8b7680 [Tathagata Das] Updated doc
33fe43a [Tathagata Das] Added more tests
7997138 [Tathagata Das] Fix style errors
a806710 [Tathagata Das] Fixed unit test and use KinesisInputDStream
40a1709 [Tathagata Das] Fixed KinesisReceiverSuite tests
7e44df6 [Tathagata Das] Added documentation and fixed checkpointing
096383f [Tathagata Das] Added test, and addressed some of the comments.
84a7892 [Tathagata Das] fixed scala style issue
e19e37d [Tathagata Das] Added license
1cd7b66 [Tathagata Das] Updated kinesis receiver
2015-08-05 00:20:26 -07:00
zsxwing d34bac0e15 [SPARK-9504] [STREAMING] [TESTS] Fix o.a.s.streaming.StreamingContextSuite.stop gracefully again
The test failure is here: https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/3150/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=centos/testReport/junit/org.apache.spark.streaming/StreamingContextSuite/stop_gracefully/

There is a race condition in TestReceiver that it may add 1 record and increase `TestReceiver.counter` after stopping `BlockGenerator`. This PR just adds `join` to wait the pushing thread.

Author: zsxwing <zsxwing@gmail.com>

Closes #7934 from zsxwing/SPARK-9504-2 and squashes the following commits:

cfd7973 [zsxwing] Wait for the thread to make sure we won't change TestReceiver.counter after stopping BlockGenerator
2015-08-04 20:09:15 -07:00
Sean Owen 76d74090d6 [SPARK-9534] [BUILD] Enable javac lint for scalac parity; fix a lot of build warnings, 1.5.0 edition
Enable most javac lint warnings; fix a lot of build warnings. In a few cases, touch up surrounding code in the process.

I'll explain several of the changes inline in comments.

Author: Sean Owen <sowen@cloudera.com>

Closes #7862 from srowen/SPARK-9534 and squashes the following commits:

ea51618 [Sean Owen] Enable most javac lint warnings; fix a lot of build warnings. In a few cases, touch up surrounding code in the process.
2015-08-04 12:02:26 +01:00
Sameer Abhyankar 060c79aab5 [SPARK-9056] [STREAMING] Rename configuration spark.streaming.minRememberDuration to spark.streaming.fileStream.minRememberDuration
Rename configuration `spark.streaming.minRememberDuration` to `spark.streaming.fileStream.minRememberDuration`

Author: Sameer Abhyankar <sabhyankar@sabhyankar-MBP.local>
Author: Sameer Abhyankar <sabhyankar@sabhyankar-MBP.Samavihome>

Closes #7740 from sabhyankar/spark_branch_9056 and squashes the following commits:

d5b2f1f [Sameer Abhyankar] Correct deprecated version to 1.5
1268133 [Sameer Abhyankar] Add {} and indentation
ddf9844 [Sameer Abhyankar] Change 4 space indentation to 2 space indentation
1819b5f [Sameer Abhyankar] Use spark.streaming.fileStream.minRememberDuration property in lieu of spark.streaming.minRememberDuration
2015-07-31 13:08:55 -07:00
zsxwing d046347014 [SPARK-9504] [STREAMING] [TESTS] Use eventually to fix the flaky test
The previous code uses `ssc.awaitTerminationOrTimeout(500)`. Since nobody will stop it during `awaitTerminationOrTimeout`, it's just like `sleep(500)`. In a super overloaded Jenkins worker, the receiver may be not able to start in 500 milliseconds. Verified this in the log of https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39149/ There is no log about starting the receiver before this failure. That's why `assert(runningCount > 0)` failed.

This PR replaces `awaitTerminationOrTimeout` with `eventually` which should be more reliable.

Author: zsxwing <zsxwing@gmail.com>

Closes #7823 from zsxwing/SPARK-9504 and squashes the following commits:

7af66a6 [zsxwing] Remove wrong assertion
5ba2c99 [zsxwing] Use eventually to fix the flaky test
2015-07-31 12:10:55 -07:00
Iulian Dragos 0a1d2ca42c [SPARK-8979] Add a PID based rate estimator
Based on #7600

/cc tdas

Author: Iulian Dragos <jaguarul@gmail.com>
Author: François Garillot <francois@garillot.net>

Closes #7648 from dragos/topic/streaming-bp/pid and squashes the following commits:

aa5b097 [Iulian Dragos] Add more comments, made all PID constant parameters positive, a couple more tests.
93b74f8 [Iulian Dragos] Better explanation of historicalError.
7975b0c [Iulian Dragos] Add configuration for PID.
26cfd78 [Iulian Dragos] A couple of variable renames.
d0bdf7c [Iulian Dragos] Update to latest version of the code, various style and name improvements.
d58b845 [François Garillot] [SPARK-8979][Streaming] Implements a PIDRateEstimator
2015-07-31 12:04:03 -07:00
cody koeninger 9307f5653d [SPARK-9472] [STREAMING] consistent hadoop configuration, streaming only
Author: cody koeninger <cody@koeninger.org>

Closes #7772 from koeninger/streaming-hadoop-config and squashes the following commits:

5267284 [cody koeninger] [SPARK-4229][Streaming] consistent hadoop configuration, streaming only
2015-07-30 17:44:20 -07:00
zsxwing 0dbd6963d5 [SPARK-9479] [STREAMING] [TESTS] Fix ReceiverTrackerSuite failure for maven build and other potential test failures in Streaming
See https://issues.apache.org/jira/browse/SPARK-9479 for the failure cause.

The PR includes the following changes:
1. Make ReceiverTrackerSuite create StreamingContext in the test body.
2. Fix places that don't stop StreamingContext. I verified no SparkContext was stopped in the shutdown hook locally after this fix.
3. Fix an issue that `ReceiverTracker.endpoint` may be null.
4. Make sure stopping SparkContext in non-main thread won't fail other tests.

Author: zsxwing <zsxwing@gmail.com>

Closes #7797 from zsxwing/fix-ReceiverTrackerSuite and squashes the following commits:

3a4bb98 [zsxwing] Fix another potential NPE
d7497df [zsxwing] Fix ReceiverTrackerSuite; make sure StreamingContext in tests is closed
2015-07-30 15:39:46 -07:00
Iulian Dragos 819be46e5a [SPARK-8977] [STREAMING] Defines the RateEstimator interface, and impements the RateController
Based on #7471.

- [x] add a test that exercises the publish path from driver to receiver
- [ ] remove Serializable from `RateController` and `RateEstimator`

Author: Iulian Dragos <jaguarul@gmail.com>
Author: François Garillot <francois@garillot.net>

Closes #7600 from dragos/topic/streaming-bp/rate-controller and squashes the following commits:

f168c94 [Iulian Dragos] Latest review round.
5125e60 [Iulian Dragos] Fix style.
a2eb3b9 [Iulian Dragos] Merge remote-tracking branch 'upstream/master' into topic/streaming-bp/rate-controller
475e346 [Iulian Dragos] Latest round of reviews.
e9fb45e [Iulian Dragos] - Add a test for checkpointing - fixed serialization for RateController.executionContext
715437a [Iulian Dragos] Review comments and added a `reset` call in ReceiverTrackerTest.
e57c66b [Iulian Dragos] Added a couple of tests for the full scenario from driver to receivers, with several rate updates.
b425d32 [Iulian Dragos] Removed DeveloperAPI, removed rateEstimator field, removed Noop rate estimator, changed logic for initialising rate estimator.
238cfc6 [Iulian Dragos] Merge remote-tracking branch 'upstream/master' into topic/streaming-bp/rate-controller
34a389d [Iulian Dragos] Various style changes and a first test for the rate controller.
d32ca36 [François Garillot] [SPARK-8977][Streaming] Defines the RateEstimator interface, and implements the ReceiverRateController
8941cf9 [Iulian Dragos] Renames and other nitpicks.
162d9e5 [Iulian Dragos] Use Reflection for accessing truly private `executor` method and use the listener bus to know when receivers have registered (`onStart` is called before receivers have registered, leading to flaky behavior).
210f495 [Iulian Dragos] Revert "Added a few tests that measure the receiver’s rate."
0c51959 [Iulian Dragos] Added a few tests that measure the receiver’s rate.
261a051 [Iulian Dragos] - removed field to hold the current rate limit in rate limiter - made rate limit a Long and default to Long.MaxValue (consequence of the above) - removed custom `waitUntil` and replaced it by `eventually`
cd1397d [Iulian Dragos] Add a test for the propagation of a new rate limit from driver to receivers.
6369b30 [Iulian Dragos] Merge pull request #15 from huitseeker/SPARK-8975
d15de42 [François Garillot] [SPARK-8975][Streaming] Adds Ratelimiter unit tests w.r.t. spark.streaming.receiver.maxRate
4721c7d [François Garillot] [SPARK-8975][Streaming] Add a mechanism to send a new rate from the driver to the block generator
2015-07-29 13:47:37 -07:00
Tathagata Das c5ed36953f [STREAMING] [HOTFIX] Ignore ReceiverTrackerSuite flaky test
Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #7738 from tdas/ReceiverTrackerSuite-hotfix and squashes the following commits:

00f0ee1 [Tathagata Das] ignore flaky test
2015-07-28 16:41:56 -07:00
zsxwing daa1964b60 [SPARK-8882] [STREAMING] Add a new Receiver scheduling mechanism
The design doc: https://docs.google.com/document/d/1ZsoRvHjpISPrDmSjsGzuSu8UjwgbtmoCTzmhgTurHJw/edit?usp=sharing

Author: zsxwing <zsxwing@gmail.com>

Closes #7276 from zsxwing/receiver-scheduling and squashes the following commits:

137b257 [zsxwing] Add preferredNumExecutors to rescheduleReceiver
61a6c3f [zsxwing] Set state to ReceiverState.INACTIVE in deregisterReceiver
5e1fa48 [zsxwing] Fix the code style
7451498 [zsxwing] Move DummyReceiver back to ReceiverTrackerSuite
715ef9c [zsxwing] Rename: scheduledLocations -> scheduledExecutors; locations -> executors
05daf9c [zsxwing] Use receiverTrackingInfo.toReceiverInfo
1d6d7c8 [zsxwing] Merge branch 'master' into receiver-scheduling
8f93c8d [zsxwing] Use hostPort as the receiver location rather than host; fix comments and unit tests
59f8887 [zsxwing] Schedule all receivers at the same time when launching them
075e0a3 [zsxwing] Add receiver RDD name; use '!isTrackerStarted' instead
276a4ac [zsxwing] Remove "ReceiverLauncher" and move codes to "launchReceivers"
fab9a01 [zsxwing] Move methods back to the outer class
4e639c4 [zsxwing] Fix unintentional changes
f60d021 [zsxwing] Reorganize ReceiverTracker to use an event loop for lock free
105037e [zsxwing] Merge branch 'master' into receiver-scheduling
5fee132 [zsxwing] Update tha scheduling algorithm to avoid to keep restarting Receiver
9e242c8 [zsxwing] Remove the ScheduleReceiver message because we can refuse it when receiving RegisterReceiver
a9acfbf [zsxwing] Merge branch 'squash-pr-6294' into receiver-scheduling
881edb9 [zsxwing] ReceiverScheduler -> ReceiverSchedulingPolicy
e530bcc [zsxwing] [SPARK-5681][Streaming] Use a lock to eliminate the race condition when stopping receivers and registering receivers happen at the same time #6294
3b87e4a [zsxwing] Revert SparkContext.scala
a86850c [zsxwing] Remove submitAsyncJob and revert JobWaiter
f549595 [zsxwing] Add comments for the scheduling approach
9ecc08e [zsxwing] Fix comments and code style
28d1bee [zsxwing] Make 'host' protected; rescheduleReceiver -> getAllowedLocations
2c86a9e [zsxwing] Use tryFailure to support calling jobFailed multiple times
ca6fe35 [zsxwing] Add a test for Receiver.restart
27acd45 [zsxwing] Add unit tests for LoadBalanceReceiverSchedulerImplSuite
cc76142 [zsxwing] Add JobWaiter.toFuture to avoid blocking threads
d9a3e72 [zsxwing] Add a new Receiver scheduling mechanism
2015-07-27 17:59:43 -07:00
Marcelo Vanzin 8399ba1487 [SPARK-9261] [STREAMING] Avoid calling APIs that expose shaded classes.
Doing this may cause weird errors when tests are run on maven, depending
on the flags used. Instead, expose the needed functionality through methods
that do not expose shaded classes.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #7601 from vanzin/SPARK-9261 and squashes the following commits:

4f64a16 [Marcelo Vanzin] [SPARK-9261] [streaming] Avoid calling APIs that expose shaded classes.
2015-07-24 11:53:16 -07:00
Iulian Dragos 798dff7b4b [SPARK-8975] [STREAMING] Adds a mechanism to send a new rate from the driver to the block generator
First step for [SPARK-7398](https://issues.apache.org/jira/browse/SPARK-7398).

tdas huitseeker

Author: Iulian Dragos <jaguarul@gmail.com>
Author: François Garillot <francois@garillot.net>

Closes #7471 from dragos/topic/streaming-bp/dynamic-rate and squashes the following commits:

8941cf9 [Iulian Dragos] Renames and other nitpicks.
162d9e5 [Iulian Dragos] Use Reflection for accessing truly private `executor` method and use the listener bus to know when receivers have registered (`onStart` is called before receivers have registered, leading to flaky behavior).
210f495 [Iulian Dragos] Revert "Added a few tests that measure the receiver’s rate."
0c51959 [Iulian Dragos] Added a few tests that measure the receiver’s rate.
261a051 [Iulian Dragos] - removed field to hold the current rate limit in rate limiter - made rate limit a Long and default to Long.MaxValue (consequence of the above) - removed custom `waitUntil` and replaced it by `eventually`
cd1397d [Iulian Dragos] Add a test for the propagation of a new rate limit from driver to receivers.
6369b30 [Iulian Dragos] Merge pull request #15 from huitseeker/SPARK-8975
d15de42 [François Garillot] [SPARK-8975][Streaming] Adds Ratelimiter unit tests w.r.t. spark.streaming.receiver.maxRate
4721c7d [François Garillot] [SPARK-8975][Streaming] Add a mechanism to send a new rate from the driver to the block generator
2015-07-22 15:54:08 -07:00
zsxwing ad0954f6de [SPARK-5681] [STREAMING] Move 'stopReceivers' to the event loop to resolve the race condition
This is an alternative way to fix `SPARK-5681`. It minimizes the changes.

Closes #4467

Author: zsxwing <zsxwing@gmail.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #6294 from zsxwing/pr4467 and squashes the following commits:

709ac1f [zsxwing] Fix the comment
e103e8a [zsxwing] Move ReceiverTracker.stop into ReceiverTracker.stop
f637142 [zsxwing] Address minor code style comments
a178d37 [zsxwing] Move 'stopReceivers' to the event looop to resolve the race condition
51fb07e [zsxwing] Fix the code style
3cb19a3 [zsxwing] Merge branch 'master' into pr4467
b4c29e7 [zsxwing] Stop receiver only if we start it
c41ee94 [zsxwing] Make stopReceivers private
7c73c1f [zsxwing] Use trackerStateLock to protect trackerState
a8120c0 [zsxwing] Merge branch 'master' into pr4467
7b1d9af [zsxwing] "case Throwable" => "case NonFatal"
15ed4a1 [zsxwing] Register before starting the receiver
fff63f9 [zsxwing] Use a lock to eliminate the race condition when stopping receivers and registering receivers happen at the same time.
e0ef72a [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into tracker_status_timeout
19b76d9 [Liang-Chi Hsieh] Remove timeout.
34c18dc [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into tracker_status_timeout
c419677 [Liang-Chi Hsieh] Fix style.
9e1a760 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into tracker_status_timeout
355f9ce [Liang-Chi Hsieh] Separate register and start events for receivers.
3d568e8 [Liang-Chi Hsieh] Let receivers get registered first before going started.
ae0d9fd [Liang-Chi Hsieh] Merge branch 'master' into tracker_status_timeout
77983f3 [Liang-Chi Hsieh] Add tracker status and stop to receive messages when stopping tracker.
2015-07-17 14:00:31 -07:00
jerryshao 031d7d4143 [SPARK-6304] [STREAMING] Fix checkpointing doesn't retain driver port issue.
Author: jerryshao <saisai.shao@intel.com>
Author: Saisai Shao <saisai.shao@intel.com>

Closes #5060 from jerryshao/SPARK-6304 and squashes the following commits:

89b01f5 [jerryshao] Update the unit test to add more cases
275d252 [jerryshao] Address the comments
7cc146d [jerryshao] Address the comments
2624723 [jerryshao] Fix rebase conflict
45befaa [Saisai Shao] Update the unit test
bbc1c9c [Saisai Shao] Fix checkpointing doesn't retain driver port issue
2015-07-16 16:55:46 -07:00