Commit graph

753 commits

Author SHA1 Message Date
Andrew Or b93c97d79b [SPARK-7501] [STREAMING] DAG visualization: show DStream operations
This is similar to #5999, but for streaming. Roughly 200 lines are tests.

One thing to note here is that we already do some kind of scoping thing for call sites, so this patch adds the new RDD operation scoping logic in the same place. Also, this patch adds a `try finally` block to set the relevant variables in a safer way.

tdas zsxwing

------------------------
**Before**
<img src="https://cloud.githubusercontent.com/assets/2133137/7625996/d88211b8-f9b4-11e4-90b9-e11baa52d6d7.png" width="450px"/>

--------------------------
**After**
<img src="https://cloud.githubusercontent.com/assets/2133137/7625997/e0878f8c-f9b4-11e4-8df3-7dd611b13c87.png" width="650px"/>

Author: Andrew Or <andrew@databricks.com>

Closes #6034 from andrewor14/dag-viz-streaming and squashes the following commits:

932a64a [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
e685df9 [Andrew Or] Rename createRDDWith
84d0656 [Andrew Or] Review feedback
697c086 [Andrew Or] Fix tests
53b9936 [Andrew Or] Set scopes for foreachRDD properly
1881802 [Andrew Or] Refactor DStream scope names again
af4ba8d [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
fd07d22 [Andrew Or] Make MQTT lower case
f6de871 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
0ca1801 [Andrew Or] Remove a few unnecessary withScopes on aliases
fa4e5fb [Andrew Or] Pass in input stream name rather than defining it from within
1af0b0e [Andrew Or] Fix style
074c00b [Andrew Or] Review comments
d25a324 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
e4a93ac [Andrew Or] Fix tests?
25416dc [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
9113183 [Andrew Or] Add tests for DStream scopes
b3806ab [Andrew Or] Fix test
bb80bbb [Andrew Or] Fix MIMA?
5c30360 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
5703939 [Andrew Or] Rename operations that create InputDStreams
7c4513d [Andrew Or] Group RDDs by DStream operations and batches
bf0ab6e [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
05c2676 [Andrew Or] Wrap many more methods in withScope
c121047 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
65ef3e9 [Andrew Or] Fix NPE
a0d3263 [Andrew Or] Scope streaming operations instead of RDD operations
2015-05-18 14:33:33 -07:00
zsxwing 0b6f503d53 [SPARK-7658] [STREAMING] [WEBUI] Update the mouse behaviors for the timeline graphs
1. If the user click one point of a batch, scroll down to the corresponding batch row and highlight it. And recovery the batch row after 3 seconds if necessary.

2. Add "#batches" in the histogram graphs.

![screen shot 2015-05-14 at 7 36 19 pm](https://cloud.githubusercontent.com/assets/1000778/7646108/84f4a014-fa73-11e4-8c13-1903d267e60f.png)

![screen shot 2015-05-14 at 7 36 53 pm](https://cloud.githubusercontent.com/assets/1000778/7646109/8b11154a-fa73-11e4-820b-8ece9fa6ee3e.png)

![screen shot 2015-05-14 at 7 36 34 pm](https://cloud.githubusercontent.com/assets/1000778/7646111/93828272-fa73-11e4-89f8-580670144d3c.png)

Author: zsxwing <zsxwing@gmail.com>

Closes #6168 from zsxwing/SPARK-7658 and squashes the following commits:

c242b00 [zsxwing] Change 5 seconds to 3 seconds
31fd0aa [zsxwing] Remove the mouseover highlight feature
06c6f6f [zsxwing] Merge branch 'master' into SPARK-7658
2eaff06 [zsxwing] Merge branch 'master' into SPARK-7658
108d56c [zsxwing] Update the mouse behaviors for the timeline graphs
2015-05-18 13:34:43 -07:00
zsxwing ff71d34e00 [SPARK-7693][Core] Remove "import scala.concurrent.ExecutionContext.Implicits.global"
Learnt a lesson from SPARK-7655: Spark should avoid to use `scala.concurrent.ExecutionContext.Implicits.global` because the user may submit blocking actions to `scala.concurrent.ExecutionContext.Implicits.global` and exhaust all threads in it. This could crash Spark. So Spark should always use its own thread pools for safety.

This PR removes all usages of `scala.concurrent.ExecutionContext.Implicits.global` and uses proper thread pools to replace them.

Author: zsxwing <zsxwing@gmail.com>

Closes #6223 from zsxwing/SPARK-7693 and squashes the following commits:

a33ff06 [zsxwing] Decrease the max thread number from 1024 to 128
cf4b3fc [zsxwing] Remove "import scala.concurrent.ExecutionContext.Implicits.global"
2015-05-17 20:37:19 -07:00
zsxwing cf842d42a7 [SPARK-7650] [STREAMING] [WEBUI] Move streaming css and js files to the streaming project
cc tdas

Author: zsxwing <zsxwing@gmail.com>

Closes #6160 from zsxwing/SPARK-7650 and squashes the following commits:

fe6ae15 [zsxwing] Fix the import order
a4ffd99 [zsxwing] Merge branch 'master' into SPARK-7650
dc402b6 [zsxwing] Move streaming css and js files to the streaming project
2015-05-14 23:51:41 -07:00
zsxwing b208f998b5 [SPARK-7645] [STREAMING] [WEBUI] Show milliseconds in the UI if the batch interval < 1 second
I also updated the summary of the Streaming page.

![screen shot 2015-05-14 at 11 52 59 am](https://cloud.githubusercontent.com/assets/1000778/7640103/13cdf68e-fa36-11e4-84ec-e2a3954f4319.png)
![screen shot 2015-05-14 at 12 39 33 pm](https://cloud.githubusercontent.com/assets/1000778/7640151/4cc066ac-fa36-11e4-8494-2821d6a6f17c.png)

Author: zsxwing <zsxwing@gmail.com>

Closes #6154 from zsxwing/SPARK-7645 and squashes the following commits:

5db6ca1 [zsxwing] Add UIUtils.formatBatchTime
e4802df [zsxwing] Show milliseconds in the UI if the batch interval < 1 second
2015-05-14 16:58:36 -07:00
Tathagata Das bce00dac40 [SPARK-6752] [STREAMING] [REVISED] Allow StreamingContext to be recreated from checkpoint and existing SparkContext
This is a revision of the earlier version (see #5773) that passed the active SparkContext explicitly through a new set of Java and Scala API. The drawbacks are.

* Hard to implement in python.
* New API introduced. This is even more confusing since we are introducing getActiveOrCreate in SPARK-7553

Furthermore, there is now a direct way get an existing active SparkContext or create a new on - SparkContext.getOrCreate(conf). Its better to use this to get the SparkContext rather than have a new API to explicitly pass the context.

So in this PR I have
* Removed the new versions of StreamingContext.getOrCreate() which took SparkContext
* Added the ability to pick up existing SparkContext when the StreamingContext tries to create a SparkContext.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #6096 from tdas/SPARK-6752 and squashes the following commits:

53f4b2d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-6752
f024b77 [Tathagata Das] Removed extra API and used SparkContext.getOrCreate
2015-05-13 17:33:15 -07:00
Andrew Or bb6dec3b16 [STREAMING] [MINOR] Keep streaming.UIUtils private
zsxwing

Author: Andrew Or <andrew@databricks.com>

Closes #6134 from andrewor14/private-streaming-uiutils and squashes the following commits:

225df94 [Andrew Or] Privatize class
2015-05-13 16:31:24 -07:00
zsxwing bec938f777 [SPARK-7589] [STREAMING] [WEBUI] Make "Input Rate" in the Streaming page consistent with other pages
This PR makes "Input Rate" in the Streaming page consistent with Job and Stage pages.

![screen shot 2015-05-12 at 5 03 35 pm](https://cloud.githubusercontent.com/assets/1000778/7601444/f943f8ac-f8ca-11e4-8280-a715d814f434.png)
![screen shot 2015-05-12 at 5 07 25 pm](https://cloud.githubusercontent.com/assets/1000778/7601445/f9571c0c-f8ca-11e4-9b12-9317cb55c002.png)

Author: zsxwing <zsxwing@gmail.com>

Closes #6102 from zsxwing/SPARK-7589 and squashes the following commits:

2745225 [zsxwing] Make "Input Rate" in the Streaming page consistent with other pages
2015-05-13 10:01:26 -07:00
Tathagata Das 23f7d66d51 [SPARK-7554] [STREAMING] Throw exception when an active/stopped StreamingContext is used to create DStreams and output operations
Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #6099 from tdas/SPARK-7554 and squashes the following commits:

2cd4158 [Tathagata Das] Throw exceptions on attempts to add stuff to active and stopped contexts.
2015-05-12 17:07:21 -07:00
Tathagata Das 00e7b09a0b [SPARK-7553] [STREAMING] Added methods to maintain a singleton StreamingContext
In a REPL/notebook environment, its very easy to lose a reference to a StreamingContext by overriding the variable name. So if you happen to execute the following commands
```
val ssc = new StreamingContext(...) // cmd 1
ssc.start() // cmd 2
...
val ssc = new StreamingContext(...) // accidentally run cmd 1 again
```
The value of ssc will be overwritten. Now you can neither start the new context (as only one context can be started), nor stop the previous context (as the reference is lost).
Hence its best to maintain a singleton reference to the active context, so that we never loose reference for the active context.
Since this problem occurs useful in REPL environments, its best to add this as an Experimental support in the Scala API only so that it can be used in Scala REPLs and notebooks.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #6070 from tdas/SPARK-7553 and squashes the following commits:

731c9a1 [Tathagata Das] Fixed style
a797171 [Tathagata Das] Added more unit tests
19fc70b [Tathagata Das] Added :: Experimental :: in docs
64706c9 [Tathagata Das] Fixed test
634db5d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7553
3884a25 [Tathagata Das] Fixing test bug
d37a846 [Tathagata Das] Added getActive and getActiveOrCreate
2015-05-12 16:44:14 -07:00
zsxwing 1422e79e51 [SPARK-7406] [STREAMING] [WEBUI] Add tooltips for "Scheduling Delay", "Processing Time" and "Total Delay"
Screenshots:
![screen shot 2015-05-06 at 2 29 03 pm](https://cloud.githubusercontent.com/assets/1000778/7504129/9c57f710-f3fc-11e4-9c6e-1b79c17c546d.png)

![screen shot 2015-05-06 at 2 24 35 pm](https://cloud.githubusercontent.com/assets/1000778/7504140/b63bb216-f3fc-11e4-83a5-6dfc6481d192.png)

tdas as we discussed offline

Author: zsxwing <zsxwing@gmail.com>

Closes #5952 from zsxwing/SPARK-7406 and squashes the following commits:

2b004ea [zsxwing] Merge branch 'master' into SPARK-7406
e9eb506 [zsxwing] Update tooltip contents
2215b2a [zsxwing] Add tooltips for "Scheduling Delay", "Processing Time" and "Total Delay"
2015-05-12 14:41:21 -07:00
Tathagata Das ec6f2a9774 [SPARK-7532] [STREAMING] StreamingContext.start() made to logWarning and not throw exception
Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #6060 from tdas/SPARK-7532 and squashes the following commits:

6fe2e83 [Tathagata Das] Update docs
7dadfc3 [Tathagata Das] Fixed bug again
99c7678 [Tathagata Das] Added logInfo
65aec20 [Tathagata Das] Fix bug
5bf031b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7532
1a9a818 [Tathagata Das] Fix scaladoc
c584313 [Tathagata Das] StreamingContext.start() made to logWarning and not throw exception
2015-05-12 08:48:24 -07:00
Marcelo Vanzin 82e890fb19 [SPARK-7485] [BUILD] Remove pyspark files from assembly.
The sbt part of the build is hacky; it basically tricks sbt
into generating the zip by using a generator, but returns
an empty list for the generated files so that nothing is
actually added to the assembly.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #6022 from vanzin/SPARK-7485 and squashes the following commits:

22c1e04 [Marcelo Vanzin] Remove unneeded code.
4893622 [Marcelo Vanzin] [SPARK-7485] [build] Remove pyspark files from assembly.
2015-05-12 01:39:21 -07:00
Tathagata Das f9c7580ada [SPARK-7530] [STREAMING] Added StreamingContext.getState() to expose the current state of the context
Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #6058 from tdas/SPARK-7530 and squashes the following commits:

80ee0e6 [Tathagata Das] STARTED --> ACTIVE
3da6547 [Tathagata Das] Added synchronized
dd88444 [Tathagata Das] Added more docs
e1a8505 [Tathagata Das] Fixed comment length
89f9980 [Tathagata Das] Change to Java enum and added Java test
7c57351 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7530
dd4e702 [Tathagata Das] Addressed comments.
3d56106 [Tathagata Das] Added Mima excludes
2b86ba1 [Tathagata Das] Added scala docs.
1722433 [Tathagata Das] Fixed style
976b094 [Tathagata Das] Added license
0585130 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7530
e0f0a05 [Tathagata Das] Added getState and exposed StreamingContextState
2015-05-11 18:53:50 -07:00
jerryshao 25c01c5484 [STREAMING] [MINOR] Close files correctly when iterator is finished in streaming WAL recovery
Currently there's no chance to close the file correctly after the iteration is finished, change to `CompletionIterator` to avoid resource leakage.

Author: jerryshao <saisai.shao@intel.com>

Closes #6050 from jerryshao/close-file-correctly and squashes the following commits:

52dfaf5 [jerryshao] Close files correctly when iterator is finished
2015-05-11 14:38:58 -07:00
Tathagata Das 1b46556999 [SPARK-7361] [STREAMING] Throw unambiguous exception when attempting to start multiple StreamingContexts in the same JVM
Currently attempt to start a streamingContext while another one is started throws a confusing exception that the action name JobScheduler is already registered. Instead its best to throw a proper exception as it is not supported.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #5907 from tdas/SPARK-7361 and squashes the following commits:

fb81c4a [Tathagata Das] Fix typo
a9cd5bb [Tathagata Das] Added startSite to StreamingContext
5fdfc0d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7361
5870e2b [Tathagata Das] Added check for multiple streaming contexts
2015-05-11 10:58:56 -07:00
Wesley Miao d70a076892 [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
tdas

https://issues.apache.org/jira/browse/SPARK-7326

The problem most likely resides in DStream.slice() implementation, as shown below.

  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
    if (!isInitialized) {
      throw new SparkException(this + " has not been initialized")
    }
    if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

    logInfo("Slicing from " + fromTime + " to " + toTime +
      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")

    alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
      if (time >= zeroTime) getOrCompute(time) else None
    })
  }

Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making isTimeValid() check failed for all the remaining computation.

The fix is to add a new floor() function in Time.scala to respect the zeroTime while performing the floor :

  def floor(that: Duration, zeroTime: Time): Time = {
    val t = that.milliseconds
    new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
  }

And then change the DStream.slice to call this new floor function by passing in its zeroTime.

    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

This way the alignedToTime and alignedFromTime are *really* aligned in respect to zeroTime whose value is not really a 0.

Author: Wesley Miao <wesley.miao@gmail.com>
Author: Wesley <wesley.miao@autodesk.com>

Closes #5871 from wesleymiao/spark-7326 and squashes the following commits:

82a4d8c [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream dosen't work all the time
48b4dc0 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
6ade399 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
2611745 [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
2015-05-11 12:20:06 +01:00
zsxwing 22ab70e06e [SPARK-7305] [STREAMING] [WEBUI] Make BatchPage show friendly information when jobs are dropped by SparkListener
If jobs are dropped by SparkListener, at least we can show the job ids in BatchPage. Screenshot:

![b1](https://cloud.githubusercontent.com/assets/1000778/7434968/f19aa784-eff3-11e4-8f86-36a073873574.png)

Author: zsxwing <zsxwing@gmail.com>

Closes #5840 from zsxwing/SPARK-7305 and squashes the following commits:

aca0ba6 [zsxwing] Fix the code style
718765e [zsxwing] Make generateNormalJobRow private
8073b03 [zsxwing] Merge branch 'master' into SPARK-7305
83dec11 [zsxwing] Make BatchPage show friendly information when jobs are dropped by SparkListener
2015-05-07 17:34:44 -07:00
Tathagata Das 01187f59b3 [SPARK-7217] [STREAMING] Add configuration to control the default behavior of StreamingContext.stop() implicitly calling SparkContext.stop()
In environments like notebooks, the SparkContext is managed by the underlying infrastructure and it is expected that the SparkContext will not be stopped. However, StreamingContext.stop() calls SparkContext.stop() as a non-intuitive side-effect. This PR adds a configuration in SparkConf that sets the default StreamingContext stop behavior. It should be such that the existing behavior does not change for existing users.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #5929 from tdas/SPARK-7217 and squashes the following commits:

869a763 [Tathagata Das] Changed implementation.
685fe00 [Tathagata Das] Added configuration
2015-05-07 00:24:44 -07:00
Tathagata Das cfdadcbd2b [SPARK-7430] [STREAMING] [TEST] General improvements to streaming tests to increase debuggability
Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #5961 from tdas/SPARK-7430 and squashes the following commits:

d654978 [Tathagata Das] Fix scala style
fbf7174 [Tathagata Das] Added more verbose assert failure messages.
6aea07a [Tathagata Das] Ensure SynchronizedBuffer is used in every TestSuiteBase
2015-05-07 00:21:10 -07:00
zsxwing 14502d5e56 [SPARK-7405] [STREAMING] Fix the bug that ReceiverInputDStream doesn't report InputInfo
The bug is because SPARK-7139 removed some codes from SPARK-7112 unintentionally here: 1854ac326a (diff-5c8651dd78abd20439b8eb938175075dL72)

This PR just added them back and added some assertions in the tests to verify it.

Author: zsxwing <zsxwing@gmail.com>

Closes #5950 from zsxwing/SPARK-7405 and squashes the following commits:

675f5d9 [zsxwing] Fix the bug that ReceiverInputDStream doesn't report InputInfo
2015-05-06 18:07:00 -07:00
zsxwing 489700c809 [SPARK-6939] [STREAMING] [WEBUI] Add timeline and histogram graphs for streaming statistics
This is the initial work of SPARK-6939. Not yet ready for code review. Here are the screenshots:

![graph1](https://cloud.githubusercontent.com/assets/1000778/7165766/465942e0-e3dc-11e4-9b05-c184b09d75dc.png)

![graph2](https://cloud.githubusercontent.com/assets/1000778/7165779/53f13f34-e3dc-11e4-8714-a4a75b7e09ff.png)

TODOs:
- [x] Display more information on mouse hover
- [x] Align the timeline and distribution graphs
- [x] Clean up the codes

Author: zsxwing <zsxwing@gmail.com>

Closes #5533 from zsxwing/SPARK-6939 and squashes the following commits:

9f7cd19 [zsxwing] Merge branch 'master' into SPARK-6939
deacc3f [zsxwing] Remove unused import
cd03424 [zsxwing] Fix .rat-excludes
70cc87d [zsxwing] Streaming Scheduling Delay => Scheduling Delay
d457277 [zsxwing] Fix UIUtils in BatchPage
b3f303e [zsxwing] Add comments for unclear classes and methods
ff0bff8 [zsxwing] Make InputDStream.name private[streaming]
cc392c5 [zsxwing] Merge branch 'master' into SPARK-6939
e275e23 [zsxwing] Move time related methods to Streaming's UIUtils
d5d86f6 [zsxwing] Fix incorrect lastErrorTime
3be4b7a [zsxwing] Use InputInfo
b50fa32 [zsxwing] Jump to the batch page when clicking a point in the timeline graphs
203605d [zsxwing] Merge branch 'master' into SPARK-6939
74307cf [zsxwing] Reuse the data for histogram graphs to reduce the page size
2586916 [zsxwing] Merge branch 'master' into SPARK-6939
70d8533 [zsxwing] Remove BatchInfo.numRecords and a few renames
7bbdc0a [zsxwing] Hide the receiver sub table if no receiver
a2972e9 [zsxwing] Add some ui tests for StreamingPage
fd03ad0 [zsxwing] Add a test to verify no memory leak
4a8f886 [zsxwing] Merge branch 'master' into SPARK-6939
18607a1 [zsxwing] Merge branch 'master' into SPARK-6939
d0b0aec [zsxwing] Clean up the codes
a459f49 [zsxwing] Add a dash line to processing time graphs
8e4363c [zsxwing] Prepare for the demo
c81a1ee [zsxwing] Change time unit in the graphs automatically
4c0b43f [zsxwing] Update Streaming UI
04c7500 [zsxwing] Make the server and client use the same timezone
fed8219 [zsxwing] Move the x axis at the top and show a better tooltip
c23ce10 [zsxwing] Make two graphs close
d78672a [zsxwing] Make the X axis use the same range
881c907 [zsxwing] Use histogram for distribution
5688702 [zsxwing] Fix the unit test
ddf741a [zsxwing] Fix the unit test
ad93295 [zsxwing] Remove unnecessary codes
a0458f9 [zsxwing] Clean the codes
b82ed1e [zsxwing] Update the graphs as per comments
dd653a1 [zsxwing] Add timeline and histogram graphs for streaming statistics
2015-05-05 12:52:16 -07:00
Andrew Or 57e9f29e17 [SPARK-7318] [STREAMING] DStream cleans objects that are not closures
I added a check in `ClosureCleaner#clean` to fail fast if this is detected in the future. tdas

Author: Andrew Or <andrew@databricks.com>

Closes #5860 from andrewor14/streaming-closure-cleaner and squashes the following commits:

8e971d7 [Andrew Or] Do not throw exception if object to clean is not closure
5ee4e25 [Andrew Or] Fix tests
eed3390 [Andrew Or] Merge branch 'master' of github.com:apache/spark into streaming-closure-cleaner
67eeff4 [Andrew Or] Add tests
a4fa768 [Andrew Or] Clean the closure, not the RDD
2015-05-05 09:37:49 -07:00
zsxwing c6d1efba29 [SPARK-7350] [STREAMING] [WEBUI] Attach the Streaming tab when calling ssc.start()
It's meaningless to display the Streaming tab before `ssc.start()`. So we should attach it in the `ssc.start` method.

Author: zsxwing <zsxwing@gmail.com>

Closes #5898 from zsxwing/SPARK-7350 and squashes the following commits:

e676487 [zsxwing] Attach the Streaming tab when calling ssc.start()
2015-05-05 15:09:58 +01:00
zsxwing 4d29867ede [SPARK-7341] [STREAMING] [TESTS] Fix the flaky test: org.apache.spark.stre...
...aming.InputStreamsSuite.socket input stream

Remove non-deterministic "Thread.sleep" and use deterministic strategies to fix the flaky failure: https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-Maven-pre-YARN/hadoop.version=1.0.4,label=centos/2127/testReport/junit/org.apache.spark.streaming/InputStreamsSuite/socket_input_stream/

Author: zsxwing <zsxwing@gmail.com>

Closes #5891 from zsxwing/SPARK-7341 and squashes the following commits:

611157a [zsxwing] Add wait methods to BatchCounter and use BatchCounter in InputStreamsSuite
014b58f [zsxwing] Use withXXX to clean up the resources
c9bf746 [zsxwing] Move 'waitForStart' into the 'start' method and fix the code style
9d0de6d [zsxwing] [SPARK-7341][Streaming][Tests] Fix the flaky test: org.apache.spark.streaming.InputStreamsSuite.socket input stream
2015-05-05 02:15:39 -07:00
jerryshao 8436f7e98e [SPARK-7113] [STREAMING] Support input information reporting for Direct Kafka stream
Author: jerryshao <saisai.shao@intel.com>

Closes #5879 from jerryshao/SPARK-7113 and squashes the following commits:

b0b506c [jerryshao] Address the comments
0babe66 [jerryshao] Support input information reporting for Direct Kafka stream
2015-05-05 02:01:06 -07:00
Tathagata Das 1854ac326a [SPARK-7139] [STREAMING] Allow received block metadata to be saved to WAL and recovered on driver failure
- Enabled ReceivedBlockTracker WAL by default
- Stored block metadata in the WAL
- Optimized WALBackedBlockRDD by skipping block fetch when the block is known to not exist in Spark

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #5732 from tdas/SPARK-7139 and squashes the following commits:

575476e [Tathagata Das] Added more tests to get 100% coverage of the WALBackedBlockRDD
19668ba [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
685fab3 [Tathagata Das] Addressed comments in PR
637bc9c [Tathagata Das] Changed segment to handle
466212c [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
5f67a59 [Tathagata Das] Fixed HdfsUtils to handle append in local file system
1bc5bc3 [Tathagata Das] Fixed bug on unexpected recovery
d06fa21 [Tathagata Das] Enabled ReceivedBlockTracker by default, stored block metadata and optimized block fetching in WALBackedBlockRDD
2015-05-05 01:45:19 -07:00
Tathagata Das ecc6eb50a5 [SPARK-7315] [STREAMING] [TEST] Fix flaky WALBackedBlockRDDSuite
`FileUtils.getTempDirectoryPath()` path may or may not exist. We want to make sure that it does not exist.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #5853 from tdas/SPARK-7315 and squashes the following commits:

141afd5 [Tathagata Das] Removed use of FileUtils
b08d4f1 [Tathagata Das] Fix flaky WALBackedBlockRDDSuite
2015-05-02 01:53:14 -07:00
jerryshao b88c275e6e [SPARK-7112][Streaming][WIP] Add a InputInfoTracker to track all the input streams
Author: jerryshao <saisai.shao@intel.com>
Author: Saisai Shao <saisai.shao@intel.com>

Closes #5680 from jerryshao/SPARK-7111 and squashes the following commits:

339f854 [Saisai Shao] Add an end-to-end test
812bcaf [jerryshao] Continue address the comments
abd0036 [jerryshao] Address the comments
727264e [jerryshao] Fix comment typo
6682bef [jerryshao] Fix compile issue
8325787 [jerryshao] Fix rebase issue
17fa251 [jerryshao] Refactor to build InputInfoTracker
ee1b536 [jerryshao] Add DirectStreamTracker to track the direct streams
2015-05-01 17:46:06 -07:00
zsxwing ebc25a4ddf [SPARK-7309] [CORE] [STREAMING] Shutdown the thread pools in ReceivedBlockHandler and DAGScheduler
Shutdown the thread pools in ReceivedBlockHandler and DAGScheduler when stopping them.

Author: zsxwing <zsxwing@gmail.com>

Closes #5845 from zsxwing/SPARK-7309 and squashes the following commits:

6c004fd [zsxwing] Shutdown the thread pools in ReceivedBlockHandler and DAGScheduler
2015-05-01 17:41:55 -07:00
zsxwing 69a739c7f5 [SPARK-7282] [STREAMING] Fix the race conditions in StreamingListenerSuite
Fixed the following flaky test
```Scala
[info] StreamingListenerSuite:
[info] - batch info reporting (782 milliseconds)
[info] - receiver info reporting *** FAILED *** (3 seconds, 911 milliseconds)
[info]   The code passed to eventually never returned normally. Attempted 10 times over 3.4735783689999997 seconds. Last failure message: 0 did not equal 1. (StreamingListenerSuite.scala:104)
[info]   org.scalatest.exceptions.TestFailedDueToTimeoutException:
[info]   at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
[info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
[info]   at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
[info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:307)
[info]   at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
[info]   at org.apache.spark.streaming.StreamingListenerSuite$$anonfun$2.apply$mcV$sp(StreamingListenerSuite.scala:104)
[info]   at org.apache.spark.streaming.StreamingListenerSuite$$anonfun$2.apply(StreamingListenerSuite.scala:94)
[info]   at org.apache.spark.streaming.StreamingListenerSuite$$anonfun$2.apply(StreamingListenerSuite.scala:94)
[info]   at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
[info]   at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
[info]   at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
[info]   at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
[info]   at org.apache.spark.streaming.StreamingListenerSuite.org$scalatest$BeforeAndAfter$$super$runTest(StreamingListenerSuite.scala:34)
[info]   at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:200)
[info]   at org.apache.spark.streaming.StreamingListenerSuite.runTest(StreamingListenerSuite.scala:34)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
[info]   at scala.collection.immutable.List.foreach(List.scala:318)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
[info]   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
[info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
[info]   at org.scalatest.Suite$class.run(Suite.scala:1424)
[info]   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
[info]   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
[info]   at org.apache.spark.streaming.StreamingListenerSuite.org$scalatest$BeforeAndAfter$$super$run(StreamingListenerSuite.scala:34)
[info]   at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241)
[info]   at org.apache.spark.streaming.StreamingListenerSuite.run(StreamingListenerSuite.scala:34)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:294)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:284)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[info]   at java.lang.Thread.run(Thread.java:745)
[info]   Cause: org.scalatest.exceptions.TestFailedException: 0 did not equal 1
[info]   at org.scalatest.MatchersHelper$.newTestFailedException(MatchersHelper.scala:160)
[info]   at org.scalatest.Matchers$ShouldMethodHelper$.shouldMatcher(Matchers.scala:6231)
[info]   at org.scalatest.Matchers$AnyShouldWrapper.should(Matchers.scala:6277)
[info]   at org.apache.spark.streaming.StreamingListenerSuite$$anonfun$2$$anonfun$apply$mcV$sp$1.apply$mcV$sp(StreamingListenerSuite.scala:105)
[info]   at org.apache.spark.streaming.StreamingListenerSuite$$anonfun$2$$anonfun$apply$mcV$sp$1.apply(StreamingListenerSuite.scala:104)
[info]   at org.apache.spark.streaming.StreamingListenerSuite$$anonfun$2$$anonfun$apply$mcV$sp$1.apply(StreamingListenerSuite.scala:104)
[info]   at org.scalatest.concurrent.Eventually$class.makeAValiantAttempt$1(Eventually.scala:394)
[info]   at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:408)
[info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
[info]   at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
[info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:307)
[info]   at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
[info]   at org.apache.spark.streaming.StreamingListenerSuite$$anonfun$2.apply$mcV$sp(StreamingListenerSuite.scala:104)
[info]   at org.apache.spark.streaming.StreamingListenerSuite$$anonfun$2.apply(StreamingListenerSuite.scala:94)
[info]   at org.apache.spark.streaming.StreamingListenerSuite$$anonfun$2.apply(StreamingListenerSuite.scala:94)
[info]   at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
[info]   at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
[info]   at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
[info]   at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
[info]   at org.apache.spark.streaming.StreamingListenerSuite.org$scalatest$BeforeAndAfter$$super$runTest(StreamingListenerSuite.scala:34)
[info]   at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:200)
[info]   at org.apache.spark.streaming.StreamingListenerSuite.runTest(StreamingListenerSuite.scala:34)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
[info]   at scala.collection.immutable.List.foreach(List.scala:318)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
[info]   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
[info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
[info]   at org.scalatest.Suite$class.run(Suite.scala:1424)
[info]   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
[info]   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
[info]   at org.apache.spark.streaming.StreamingListenerSuite.org$scalatest$BeforeAndAfter$$super$run(StreamingListenerSuite.scala:34)
[info]   at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241)
[info]   at org.apache.spark.streaming.StreamingListenerSuite.run(StreamingListenerSuite.scala:34)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:294)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:284)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[info]   at java.lang.Thread.run(Thread.java:745)
```

The original codes didn't have a memory barrier in the `eventually` closure, which might fail the test, because JVM doesn't guarantee the memory consistency between different threads without  a memory barrier.

This PR used `ConcurrentLinkedQueue` to set up the memory barrier.

Author: zsxwing <zsxwing@gmail.com>

Closes #5812 from zsxwing/SPARK-7282 and squashes the following commits:

59115ef [zsxwing] Use SynchronizedBuffer
014dd2b [zsxwing] Fix the race conditions in StreamingListenerSuite
2015-04-30 21:32:11 -07:00
zsxwing 1b7106b867 [SPARK-6862] [STREAMING] [WEBUI] Add BatchPage to display details of a batch
This is an initial commit for SPARK-6862. Once SPARK-6796 is merged, I will add the links to StreamingPage so that the user can jump to BatchPage.

Screenshots:
![success](https://cloud.githubusercontent.com/assets/1000778/7102439/bbe75406-e0b3-11e4-84fe-3e6de629a49a.png)
![failure](https://cloud.githubusercontent.com/assets/1000778/7102440/bc124454-e0b3-11e4-921a-c8b39d6b61bc.png)

Author: zsxwing <zsxwing@gmail.com>

Closes #5473 from zsxwing/SPARK-6862 and squashes the following commits:

0727d35 [zsxwing] Change BatchUIData to a case class
b380cfb [zsxwing] Add createJobStart to eliminate duplicate codes
9a3083d [zsxwing] Rename XxxDatas -> XxxData
087ba98 [zsxwing] Refactor BatchInfo to store only necessary fields
cb62e4f [zsxwing] Use Seq[(OutputOpId, SparkJobId)] to store the id relations
72f8e7e [zsxwing] Add unit tests for BatchPage
1282b10 [zsxwing] Handle some corner cases and add tests for StreamingJobProgressListener
77a69ae [zsxwing] Refactor codes as per TD's comments
35ffd80 [zsxwing] Merge branch 'master' into SPARK-6862
15bdf9b [zsxwing] Add batch links and unit tests
4bf66b6 [zsxwing] Merge branch 'master' into SPARK-6862
7168807 [zsxwing] Limit the max width of the error message and fix nits in the UI
0b226f9 [zsxwing] Change 'Last Error' to 'Error'
fc98a43 [zsxwing] Put clearing local properties to finally and remove redundant private[streaming]
0c7b2eb [zsxwing] Add BatchPage to display details of a batch
2015-04-29 18:22:14 -07:00
Tathagata Das a9c4e29950 [SPARK-6752] [STREAMING] [REOPENED] Allow StreamingContext to be recreated from checkpoint and existing SparkContext
Original PR #5428 got reverted due to issues between MutableBoolean and Hadoop 1.0.4 (see JIRA). This replaces MutableBoolean with AtomicBoolean.

srowen pwendell

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #5773 from tdas/SPARK-6752 and squashes the following commits:

a0c0ead [Tathagata Das] Fix for hadoop 1.0.4
70ae85b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-6752
94db63c [Tathagata Das] Fix long line.
524f519 [Tathagata Das] Many changes based on PR comments.
eabd092 [Tathagata Das] Added Function0, Java API and unit tests for StreamingContext.getOrCreate
36a7823 [Tathagata Das] Minor changes.
204814e [Tathagata Das] Added StreamingContext.getOrCreate with existing SparkContext
2015-04-29 13:10:31 -07:00
Tathagata Das 1868bd40dc [SPARK-7056] [STREAMING] Make the Write Ahead Log pluggable
Users may want the WAL data to be written to non-HDFS data storage systems. To allow that, we have to make the WAL pluggable. The following design doc outlines the plan.

https://docs.google.com/a/databricks.com/document/d/1A2XaOLRFzvIZSi18i_luNw5Rmm9j2j4AigktXxIYxmY/edit?usp=sharing

Things to add.
* Unit tests for WriteAheadLogUtils

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #5645 from tdas/wal-pluggable and squashes the following commits:

2c431fd [Tathagata Das] Minor fixes.
c2bc7384 [Tathagata Das] More changes based on PR comments.
569a416 [Tathagata Das] fixed long line
bde26b1 [Tathagata Das] Renamed segment to record handle everywhere
b65e155 [Tathagata Das] More changes based on PR comments.
d7cd15b [Tathagata Das] Fixed test
1a32a4b [Tathagata Das] Fixed test
e0d19fb [Tathagata Das] Fixed defaults
9310cbf [Tathagata Das] style fix.
86abcb1 [Tathagata Das] Refactored WriteAheadLogUtils, and consolidated all WAL related configuration into it.
84ce469 [Tathagata Das] Added unit test and fixed compilation error.
bce5e75 [Tathagata Das] Fixed long lines.
837c4f5 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into wal-pluggable
754fbf8 [Tathagata Das] Added license and docs.
09bc6fe [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into wal-pluggable
7dd2d4b [Tathagata Das] Added pluggable WriteAheadLog interface, and refactored all code along with it
2015-04-29 13:06:11 -07:00
Reynold Xin 687273d915 [SPARK-7223] Rename RPC askWithReply -> askWithReply, sendWithReply -> ask.
The old naming scheme was very confusing between askWithReply and sendWithReply. I also divided RpcEnv.scala into multiple files.

Author: Reynold Xin <rxin@databricks.com>

Closes #5768 from rxin/rpc-rename and squashes the following commits:

a84058e [Reynold Xin] [SPARK-7223] Rename RPC askWithReply -> askWithReply, sendWithReply -> ask.
2015-04-29 09:46:37 -07:00
Tathagata Das 5c8f4bd5fa [SPARK-7138] [STREAMING] Add method to BlockGenerator to add multiple records to BlockGenerator with single callback
This is to ensure that receivers that receive data in small batches (like Kinesis) and want to add them but want the callback function to be called only once. This is for internal use only for improvement to Kinesis Receiver that we are planning to do.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #5695 from tdas/SPARK-7138 and squashes the following commits:

a35cf7d [Tathagata Das] Fixed style.
a7a4cb9 [Tathagata Das] Added extra method to BlockGenerator.
2015-04-28 19:31:57 -07:00
Patrick Wendell a61d65fc8b Revert "[SPARK-6752][Streaming] Allow StreamingContext to be recreated from checkpoint and existing SparkContext"
This reverts commit 534f2a4362.
2015-04-25 10:37:34 -07:00
Tathagata Das 534f2a4362 [SPARK-6752][Streaming] Allow StreamingContext to be recreated from checkpoint and existing SparkContext
Currently if you want to create a StreamingContext from checkpoint information, the system will create a new SparkContext. This prevent StreamingContext to be recreated from checkpoints in managed environments where SparkContext is precreated.

The solution in this PR: Introduce the following methods on StreamingContext
1. `new StreamingContext(checkpointDirectory, sparkContext)`
   Recreate StreamingContext from checkpoint using the provided SparkContext
2. `StreamingContext.getOrCreate(checkpointDirectory, sparkContext, createFunction: SparkContext => StreamingContext)`
   If checkpoint file exists, then recreate StreamingContext using the provided SparkContext (that is, call 1.), else create StreamingContext using the provided createFunction

TODO: the corresponding Java and Python API has to be added as well.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #5428 from tdas/SPARK-6752 and squashes the following commits:

94db63c [Tathagata Das] Fix long line.
524f519 [Tathagata Das] Many changes based on PR comments.
eabd092 [Tathagata Das] Added Function0, Java API and unit tests for StreamingContext.getOrCreate
36a7823 [Tathagata Das] Minor changes.
204814e [Tathagata Das] Added StreamingContext.getOrCreate with existing SparkContext
2015-04-23 11:29:34 -07:00
Reynold Xin b69c4f9b2e Disable flaky test: ReceiverSuite "block generator throttling". 2015-04-22 21:24:22 -07:00
zsxwing 33b85620f9 [SPARK-7052][Core] Add ThreadUtils and move thread methods from Utils to ThreadUtils
As per rxin 's suggestion in https://github.com/apache/spark/pull/5392/files#r28757176

What's more, there is a race condition in the global shared `daemonThreadFactoryBuilder`. `daemonThreadFactoryBuilder` may be modified by multiple threads. This PR removed the global `daemonThreadFactoryBuilder` and created a new `ThreadFactoryBuilder` every time.

Author: zsxwing <zsxwing@gmail.com>

Closes #5631 from zsxwing/thread-utils and squashes the following commits:

9fe5b0e [zsxwing] Add ThreadUtils and move thread methods from Utils to ThreadUtils
2015-04-22 11:08:59 -07:00
emres c25ca7c5a1 SPARK-3276 Added a new configuration spark.streaming.minRememberDuration
SPARK-3276 Added a new configuration parameter ``spark.streaming.minRememberDuration``, with a default value of 1 minute.

So that when a Spark Streaming application is started, an arbitrary number of minutes can be taken as threshold for remembering.

Author: emres <emre.sevinc@gmail.com>

Closes #5438 from emres/SPARK-3276 and squashes the following commits:

766f938 [emres] SPARK-3276 Switched to using newly added getTimeAsSeconds method.
affee1d [emres] SPARK-3276 Changed the property name and variable name for minRememberDuration
c9d58ca [emres] SPARK-3276 Minor code re-formatting.
1c53ba9 [emres] SPARK-3276 Started to use ssc.conf rather than ssc.sparkContext.getConf,  and also getLong method directly.
bfe0acb [emres] SPARK-3276 Moved the minRememberDurationMin to the class
daccc82 [emres] SPARK-3276 Changed the property name to reflect the unit of value and reduced number of fields.
43cc1ce [emres] SPARK-3276 Added a new configuration parameter spark.streaming.minRemember duration, with a default value of 1 minute.
2015-04-21 16:39:56 -04:00
David McGuire 5fea3e5c36 [SPARK-6985][streaming] Receiver maxRate over 1000 causes a StackOverflowError
A simple truncation in integer division (on rates over 1000 messages / second) causes the existing implementation to sleep for 0 milliseconds, then call itself recursively; this causes what is essentially an infinite recursion, since the base case of the calculated amount of time having elapsed can't be reached before available stack space is exhausted. A fix to this truncation error is included in this patch.

However, even with the defect patched, the accuracy of the existing implementation is abysmal (the error bounds of the original test were effectively [-30%, +10%], although this fact was obscured by hard-coded error margins); as such, when the error bounds were tightened down to [-5%, +5%], the existing implementation failed to meet the new, tightened, requirements. Therefore, an industry-vetted solution (from Guava) was used to get the adapted tests to pass.

Author: David McGuire <david.mcguire2@nike.com>

Closes #5559 from dmcguire81/master and squashes the following commits:

d29d2e0 [David McGuire] Back out to +/-5% error margins, for flexibility in timing
8be6934 [David McGuire] Fix spacing per code review
90e98b9 [David McGuire] Address scalastyle errors
29011bd [David McGuire] Further ratchet down the error margins
b33b796 [David McGuire] Eliminate dependency on even distribution by BlockGenerator
8f2934b [David McGuire] Remove arbitrary thread timing / cooperation code
70ee310 [David McGuire] Use Thread.yield(), since Thread.sleep(0) is system-dependent
82ee46d [David McGuire] Replace guard clause with nested conditional
2794717 [David McGuire] Replace the RateLimiter with the Guava implementation
38f3ca8 [David McGuire] Ratchet down the error rate to +/- 5%; tests fail
24b1bc0 [David McGuire] Fix truncation in integer division causing infinite recursion
d6e1079 [David McGuire] Stack overflow error in RateLimiter on rates over 1000/s
2015-04-21 07:21:10 -04:00
zsxwing c776ee8a6f [SPARK-6979][Streaming] Replace JobScheduler.eventActor and JobGenerator.eventActor with EventLoop
Title says it all.

cc rxin tdas

Author: zsxwing <zsxwing@gmail.com>

Closes #5554 from zsxwing/SPARK-6979 and squashes the following commits:

5304350 [zsxwing] Fix NotSerializableException
e9d3479 [zsxwing] Add blank lines
633e279 [zsxwing] Fix NotSerializableException
e496ace [zsxwing] Replace JobGenerator.eventActor with EventLoop
ec6ec58 [zsxwing] Fix the import order
ce0fa73 [zsxwing] Replace JobScheduler.eventActor with EventLoop
2015-04-19 20:48:36 -07:00
zsxwing d8e1b7b06c [SPARK-6983][Streaming] Update ReceiverTrackerActor to use the new Rpc interface
A subtask of [SPARK-5293](https://issues.apache.org/jira/browse/SPARK-5293)

Author: zsxwing <zsxwing@gmail.com>

Closes #5557 from zsxwing/SPARK-6983 and squashes the following commits:

e777e9f [zsxwing] Update ReceiverTrackerActor to use the new Rpc interface
2015-04-19 20:35:43 -07:00
zsxwing 6de282e2de [SPARK-6796][Streaming][WebUI] Add "Active Batches" and "Completed Batches" lists to StreamingPage
This PR adds two lists, `Active Batches` and `Completed Batches`. Here is the screenshot:

![batch_list](https://cloud.githubusercontent.com/assets/1000778/7060458/d8898572-deb3-11e4-938b-6f8602c71a9f.png)

Due to [SPARK-6766](https://issues.apache.org/jira/browse/SPARK-6766), I need to merge #5414 in my local machine to get the above screenshot.

Author: zsxwing <zsxwing@gmail.com>

Closes #5434 from zsxwing/SPARK-6796 and squashes the following commits:

be50fc6 [zsxwing] Fix the code style
51b792e [zsxwing] Fix the unit test
6f3078e [zsxwing] Make 'startTime' readable
f40e0a9 [zsxwing] Merge branch 'master' into SPARK-6796
2525336 [zsxwing] Rename 'Processed batches' and 'Waiting batches' and also add links
a69c091 [zsxwing] Show the number of total completed batches too
a12ad7b [zsxwing] Change 'records' to 'events' in the UI
86b5e7f [zsxwing] Make BatchTableBase abstract
b248787 [zsxwing] Add tests to verify the new tables
d18ab7d [zsxwing] Fix the code style
6ceffb3 [zsxwing] Add "Active Batches" and "Completed Batches" lists to StreamingPage
2015-04-14 16:51:36 -07:00
Ilya Ganelin c4ab255e94 [SPARK-5931][CORE] Use consistent naming for time properties
I've added new utility methods to do the conversion from times specified as e.g. 120s, 240ms, 360us to convert to a consistent internal representation. I've updated usage of these constants throughout the code to be consistent.

I believe I've captured all usages of time-based properties throughout the code. I've also updated variable names in a number of places to reflect their units for clarity and updated documentation where appropriate.

Author: Ilya Ganelin <ilya.ganelin@capitalone.com>
Author: Ilya Ganelin <ilganeli@gmail.com>

Closes #5236 from ilganeli/SPARK-5931 and squashes the following commits:

4526c81 [Ilya Ganelin] Update configuration.md
de3bff9 [Ilya Ganelin] Fixing style errors
f5fafcd [Ilya Ganelin] Doc updates
951ca2d [Ilya Ganelin] Made the most recent round of changes
bc04e05 [Ilya Ganelin] Minor fixes and doc updates
25d3f52 [Ilya Ganelin] Minor nit fixes
642a06d [Ilya Ganelin] Fixed logic for invalid suffixes and addid matching test
8927e66 [Ilya Ganelin] Fixed handling of -1
69fedcc [Ilya Ganelin] Added test for zero
dc7bd08 [Ilya Ganelin] Fixed error in exception handling
7d19cdd [Ilya Ganelin] Added fix for possible NPE
6f651a8 [Ilya Ganelin] Now using regexes to simplify code in parseTimeString. Introduces getTimeAsSec and getTimeAsMs methods in SparkConf. Updated documentation
cbd2ca6 [Ilya Ganelin] Formatting error
1a1122c [Ilya Ganelin] Formatting fixes and added m for use as minute formatter
4e48679 [Ilya Ganelin] Fixed priority order and mixed up conversions in a couple spots
d4efd26 [Ilya Ganelin] Added time conversion for yarn.scheduler.heartbeat.interval-ms
cbf41db [Ilya Ganelin] Got rid of thrown exceptions
1465390 [Ilya Ganelin] Nit
28187bf [Ilya Ganelin] Convert straight to seconds
ff40bfe [Ilya Ganelin] Updated tests to fix small bugs
19c31af [Ilya Ganelin] Added cleaner computation of time conversions in tests
6387772 [Ilya Ganelin] Updated suffix handling to handle overlap of units more gracefully
5193d5f [Ilya Ganelin] Resolved merge conflicts
76cfa27 [Ilya Ganelin] [SPARK-5931] Minor nit fixes'
bf779b0 [Ilya Ganelin] Special handling of overlapping usffixes for java
dd0a680 [Ilya Ganelin] Updated scala code to call into java
b2fc965 [Ilya Ganelin] replaced get or default since it's not present in this version of java
39164f9 [Ilya Ganelin] [SPARK-5931] Updated Java conversion to be similar to scala conversion. Updated conversions to clean up code a little using TimeUnit.convert. Added Unit tests
3b126e1 [Ilya Ganelin] Fixed conversion to US from seconds
1858197 [Ilya Ganelin] Fixed bug where all time was being converted to us instead of the appropriate units
bac9edf [Ilya Ganelin] More whitespace
8613631 [Ilya Ganelin] Whitespace
1c0c07c [Ilya Ganelin] Updated Java code to add day, minutes, and hours
647b5ac [Ilya Ganelin] Udpated time conversion to use map iterator instead of if fall through
70ac213 [Ilya Ganelin] Fixed remaining usages to be consistent. Updated Java-side time conversion
68f4e93 [Ilya Ganelin] Updated more files to clean up usage of default time strings
3a12dd8 [Ilya Ganelin] Updated host revceiver
5232a36 [Ilya Ganelin] [SPARK-5931] Changed default behavior of time string conversion.
499bdf0 [Ilya Ganelin] Merge branch 'SPARK-5931' of github.com:ilganeli/spark into SPARK-5931
9e2547c [Ilya Ganelin] Reverting doc changes
8f741e1 [Ilya Ganelin] Update JavaUtils.java
34f87c2 [Ilya Ganelin] Update Utils.scala
9a29d8d [Ilya Ganelin] Fixed misuse of time in streaming context test
42477aa [Ilya Ganelin] Updated configuration doc with note on specifying time properties
cde9bff [Ilya Ganelin] Updated spark.streaming.blockInterval
c6a0095 [Ilya Ganelin] Updated spark.core.connection.auth.wait.timeout
5181597 [Ilya Ganelin] Updated spark.dynamicAllocation.schedulerBacklogTimeout
2fcc91c [Ilya Ganelin] Updated spark.dynamicAllocation.executorIdleTimeout
6d1518e [Ilya Ganelin] Upated spark.speculation.interval
3f1cfc8 [Ilya Ganelin] Updated spark.scheduler.revive.interval
3352d34 [Ilya Ganelin] Updated spark.scheduler.maxRegisteredResourcesWaitingTime
272c215 [Ilya Ganelin] Updated spark.locality.wait
7320c87 [Ilya Ganelin] updated spark.akka.heartbeat.interval
064ebd6 [Ilya Ganelin] Updated usage of spark.cleaner.ttl
21ef3dd [Ilya Ganelin] updated spark.shuffle.sasl.timeout
c9f5cad [Ilya Ganelin] Updated spark.shuffle.io.retryWait
4933fda [Ilya Ganelin] Updated usage of spark.storage.blockManagerSlaveTimeout
7db6d2a [Ilya Ganelin] Updated usage of spark.akka.timeout
404f8c3 [Ilya Ganelin] Updated usage of spark.core.connection.ack.wait.timeout
59bf9e1 [Ilya Ganelin] [SPARK-5931] Updated Utils and JavaUtils classes to add helper methods to handle time strings. Updated time strings in a few places to properly parse time
2015-04-13 16:28:07 -07:00
Reynold Xin c5b0b296b8 [SPARK-6765] Enable scalastyle on test code.
Turn scalastyle on for all test code. Most of the violations have been resolved in my previous pull requests:

Core: https://github.com/apache/spark/pull/5484
SQL: https://github.com/apache/spark/pull/5412
MLlib: https://github.com/apache/spark/pull/5411
GraphX: https://github.com/apache/spark/pull/5410
Streaming: https://github.com/apache/spark/pull/5409

Author: Reynold Xin <rxin@databricks.com>

Closes #5486 from rxin/test-style-enable and squashes the following commits:

01683de [Reynold Xin] Fixed new code.
a4ab46e [Reynold Xin] Fixed tests.
20adbc8 [Reynold Xin] Missed one violation.
5e36521 [Reynold Xin] [SPARK-6765] Enable scalastyle on test code.
2015-04-13 09:29:04 -07:00
zsxwing 14ce3ea2c9 [SPARK-6860][Streaming][WebUI] Fix the possible inconsistency of StreamingPage
Because `StreamingPage.render` doesn't hold the `listener` lock when generating the content, the different parts of content may have some inconsistent values if `listener` updates its status at the same time. And it will confuse people.

This PR added `listener.synchronized` to make sure we have a consistent view of StreamingJobProgressListener when creating the content.

Author: zsxwing <zsxwing@gmail.com>

Closes #5470 from zsxwing/SPARK-6860 and squashes the following commits:

cec6f92 [zsxwing] Add missing 'synchronized' in StreamingJobProgressListener
7182498 [zsxwing] Add synchronized to make sure we have a consistent view of StreamingJobProgressListener when creating the content
2015-04-13 12:21:29 +01:00
lisurprise cadd7d72c5 [SPARK-6762]Fix potential resource leaks in CheckPoint CheckpointWriter and CheckpointReader
The close action should be placed within finally block to avoid the potential resource leaks

Author: lisurprise <zhichao.li@intel.com>

Closes #5407 from zhichao-li/master and squashes the following commits:

065999f [lisurprise] add guard for null
ef862d6 [lisurprise] remove fs.close
a754adc [lisurprise] refactor with tryWithSafeFinally
824adb3 [lisurprise] close before validation
c877da7 [lisurprise] Fix potential resource leaks
2015-04-13 12:18:05 +01:00
zsxwing 18ca089bed [SPARK-6766][Streaming] Fix issue about StreamingListenerBatchSubmitted and StreamingListenerBatchStarted
This PR includes:

1. Send `StreamingListenerBatchSubmitted` when `JobSet` is submitted
1. Fix `StreamingListenerBatchStarted.batchInfo.processingStartTime`
1. Fix a type: `completedaBatchInfos` -> `completedBatchInfos`

Author: zsxwing <zsxwing@gmail.com>

Closes #5414 from zsxwing/SPARK-6766 and squashes the following commits:

2f85060 [zsxwing] Update tests
ca0955b [zsxwing] Combine unit tests
79b4fed [zsxwing] Add StreamingJobProgressListenerSuite to test StreamingJobProgressListener
fc3a2a1 [zsxwing] Add unit tests for SPARK-6766
74aed99 [zsxwing] Refactor as per TD's suggestion
493f978 [zsxwing] Send StreamingListenerBatchSubmitted when JobSet is submitted; fix StreamingListenerBatchStarted.batchInfo.processingStartTime; fix a typo
2015-04-10 01:51:42 -07:00