Commit graph

4329 commits

Author SHA1 Message Date
Patrick Wendell aa61bfd399 Merge pull request #88 from rxin/clean
Made the following traits/interfaces/classes non-public:

Made the following traits/interfaces/classes non-public:
SparkHadoopWriter
SparkHadoopMapRedUtil
SparkHadoopMapReduceUtil
SparkHadoopUtil
PythonAccumulatorParam
BlockManagerSlaveActor
2013-10-21 11:57:05 -07:00
Patrick Wendell 35886f3474 Merge pull request #41 from pwendell/shuffle-benchmark
Provide Instrumentation for Shuffle Write Performance

Shuffle write performance can have a major impact on the performance of jobs. This patch adds a few pieces of instrumentation related to shuffle writes. They are:

1. A listing of the time spent performing blocking writes for each task. This is implemented by keeping track of the aggregate delay seen by many individual writes.
2. An undocumented option `spark.shuffle.sync` which forces shuffle data to sync to disk. This is necessary for measuring shuffle performance in the absence of the OS buffer cache.
3. An internal utility which micro-benchmarks write throughput for simulated shuffle outputs.

I'm going to do some performance testing on this to see whether these small timing calls add overhead. From a feature perspective, however, I consider this complete. Any feedback is appreciated.
2013-10-20 22:20:32 -07:00
Reynold Xin 5b9380e017 Merge pull request #89 from rxin/executor
Don't setup the uncaught exception handler in local mode.

This avoids unit test failures for Spark streaming.

    java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.JobManager$JobHandler@38cf728d rejected from java.util.concurrent.ThreadPoolExecutor@3b69a41e[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 14]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
	at org.apache.spark.streaming.JobManager.runJob(JobManager.scala:54)
	at org.apache.spark.streaming.Scheduler$$anonfun$generateJobs$2.apply(Scheduler.scala:108)
	at org.apache.spark.streaming.Scheduler$$anonfun$generateJobs$2.apply(Scheduler.scala:108)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.streaming.Scheduler.generateJobs(Scheduler.scala:108)
	at org.apache.spark.streaming.Scheduler$$anonfun$1.apply$mcVJ$sp(Scheduler.scala:41)
	at org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:66)
	at org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:34)
2013-10-20 21:03:51 -07:00
Reynold Xin b4d8478454 Made JobLogger public again and some minor cleanup. 2013-10-20 18:59:28 -07:00
Matei Zaharia 261bcf27b3 Merge pull request #80 from rxin/build
Exclusion rules for Maven build files.
2013-10-20 17:59:51 -07:00
Matei Zaharia edc5e3f8f4 Merge pull request #75 from JoshRosen/block-manager-cleanup
Code de-duplication in BlockManager

The BlockManager has a few methods that duplicate most of their code.  This pull request extracts the duplicated code into private doPut(), doGetLocal(), and doGetRemote() methods that unify the storing/reading of bytes or objects.

I believe that I preserved the logic of the original code, but I'd appreciate some help in reviewing this.
2013-10-20 17:18:06 -07:00
Josh Rosen 68d6806ea4 Minor cleanup based on @aarondav's code review. 2013-10-20 13:20:14 -07:00
Reynold Xin 7414805e4e Don't setup the uncaught exception handler in local mode.
This avoids unit test failures for Spark streaming.
2013-10-20 13:03:48 -07:00
Reynold Xin 8e1937f8ba Made the following traits/interfaces/classes non-public:
SparkHadoopWriter
SparkHadoopMapRedUtil
SparkHadoopMapReduceUtil
SparkHadoopUtil
PythonAccumulatorParam
JobLogger
BlockManagerSlaveActor
2013-10-20 12:22:07 -07:00
Reynold Xin 2a7ae1736a Merge pull request #84 from rxin/kill1
Added documentation for setJobGroup. Also some minor cleanup in SparkContext.
2013-10-20 11:45:21 -07:00
Reynold Xin fabd05dabc Updated setGroupId documentation and marked dagSchedulerSource and blockManagerSource as private in SparkContext. 2013-10-20 10:54:30 -07:00
Matei Zaharia e4abb75d70 Merge pull request #85 from rxin/clean
Moved the top level spark package object from spark to org.apache.spark

This is a pretty annoying documentation bug ...
2013-10-20 09:38:37 -07:00
Matei Zaharia 747f538925 Merge pull request #83 from ewencp/pyspark-accumulator-add-method
Add an add() method to pyspark accumulators.

Add a regular method for adding a term to accumulators in
pyspark. Currently if you have a non-global accumulator, adding to it
is awkward. The += operator can't be used for non-global accumulators
captured via closure because it's involves an assignment. The only way
to do it is using __iadd__ directly.

Adding this method lets you write code like this:

def main():
    sc = SparkContext()
    accum = sc.accumulator(0)

    rdd = sc.parallelize([1,2,3])
    def f(x):
        accum.add(x)
    rdd.foreach(f)
    print accum.value

where using accum += x instead would have caused UnboundLocalError
exceptions in workers. Currently it would have to be written as
accum.__iadd__(x).
2013-10-19 23:40:40 -07:00
Reynold Xin 8396a6649e Moved the top level spark package object from spark to org.apache.spark 2013-10-19 23:26:15 -07:00
Reynold Xin eb9bf69462 Added documentation for setJobGroup. Also some minor cleanup in SparkContext. 2013-10-19 23:16:44 -07:00
Ewen Cheslack-Postava 7eaa56de7f Add an add() method to pyspark accumulators.
Add a regular method for adding a term to accumulators in
pyspark. Currently if you have a non-global accumulator, adding to it
is awkward. The += operator can't be used for non-global accumulators
captured via closure because it's involves an assignment. The only way
to do it is using __iadd__ directly.

Adding this method lets you write code like this:

def main():
    sc = SparkContext()
    accum = sc.accumulator(0)

    rdd = sc.parallelize([1,2,3])
    def f(x):
        accum.add(x)
    rdd.foreach(f)
    print accum.value

where using accum += x instead would have caused UnboundLocalError
exceptions in workers. Currently it would have to be written as
accum.__iadd__(x).
2013-10-19 19:55:39 -07:00
Josh Rosen 867d8fdf2a De-duplicate code in dropOld[Non]BroadcastBlocks. 2013-10-19 19:53:12 -07:00
Josh Rosen 6925a1322b Code de-duplication in put() and putBytes(). 2013-10-19 19:53:12 -07:00
Josh Rosen 8279185651 De-duplication in getRemote() and getRemoteBytes(). 2013-10-19 19:53:12 -07:00
Josh Rosen babccb695e De-duplication in getLocal() and getLocalBytes(). 2013-10-19 19:52:10 -07:00
Reynold Xin 4e44d65b5e Exclusion rules for Maven build files. 2013-10-19 12:35:55 -07:00
Reynold Xin 6511bbe2ad Merge pull request #78 from mosharaf/master
Removed BitTorrentBroadcast and TreeBroadcast.

TorrentBroadcast replaces both.
2013-10-19 11:34:56 -07:00
Mosharaf Chowdhury 29617c27a1 Removed BitTorrentBroadcast and TreeBroadcast. TorrentBroadcast is replacing both. 2013-10-18 23:54:11 -07:00
Reynold Xin f628804c02 Merge pull request #76 from pwendell/master
Clarify compression property.

Clarifies that this governs compression of internal data, not input
data or output data.
2013-10-18 23:19:42 -07:00
Patrick Wendell 6b62836285 Clarify compression property.
Clarifies that this governs compression of internal data, not input
data or output data.
2013-10-18 23:08:44 -07:00
Matei Zaharia 599dcb0ddf Merge pull request #74 from rxin/kill
Job cancellation via job group id.

This PR adds a simple API to group together a set of jobs belonging to a thread and threads spawned from it. It also allows the cancellation of all jobs in this group.

An example:

    sc.setJobDescription("this_is_the_group_id", "some job description")
    sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()

In a separate thread:

    sc.cancelJobGroup("this_is_the_group_id")
2013-10-18 22:49:00 -07:00
Reynold Xin 806f3a3adb Job cancellation via job group id. 2013-10-18 21:46:08 -07:00
Matei Zaharia 8de9706b86 Merge pull request #66 from shivaram/sbt-assembly-deps
Add SBT target to assemble dependencies

This pull request is an attempt to address the long assembly build times during development. Instead of rebuilding the assembly jar for every Spark change, this pull request adds a new SBT target `spark` that packages all the Spark modules and builds an assembly of the dependencies.

So the work flow that should work now would be something like

```
./sbt/sbt spark # Doing this once should suffice
## Make changes
./sbt/sbt compile
./sbt/sbt test or ./spark-shell
```
2013-10-18 20:32:39 -07:00
Matei Zaharia e5316d0685 Merge pull request #68 from mosharaf/master
Faster and stable/reliable broadcast

HttpBroadcast is noticeably slow, but the alternatives (TreeBroadcast or BitTorrentBroadcast) are notoriously unreliable. The main problem with them is they try to manage the memory for the pieces of a broadcast themselves. Right now, the BroadcastManager does not know which machines the tasks reading from a broadcast variable is running and when they have finished. Consequently, we try to guess and often guess wrong, which blows up the memory usage and kills/hangs jobs.

This very simple implementation solves the problem by not trying to manage the intermediate pieces; instead, it offloads that duty to the BlockManager which is quite good at juggling blocks. Otherwise, it is very similar to the BitTorrentBroadcast implementation (without fancy optimizations). And it runs much faster than HttpBroadcast we have right now.

I've been using this for another project for last couple of weeks, and just today did some benchmarking against the Http one. The following shows the improvements for increasing broadcast size for cold runs. Each line represent the number of receivers.
![fix-bc-first](https://f.cloud.github.com/assets/232966/1349342/ffa149e4-36e7-11e3-9fa6-c74555829356.png)

After the first broadcast is over, i.e., after JVM is wormed up and for HttpBroadcast the server is already running (I think), the following are the improvements for warm runs.
![fix-bc-succ](https://f.cloud.github.com/assets/232966/1349352/5a948bae-36e8-11e3-98ce-34f19ebd33e0.jpg)
The curves are not as nice as the cold runs, but the improvements are obvious, specially for larger broadcasts and more receivers.

Depending on how it goes, we should deprecate and/or remove old TreeBroadcast and BitTorrentBroadcast implementations, and hopefully, SPARK-889 will not be necessary any more.
2013-10-18 20:30:56 -07:00
Matei Zaharia 8d528af829 Merge pull request #71 from aarondav/scdefaults
Spark shell exits if it cannot create SparkContext

Mainly, this occurs if you provide a messed up MASTER url (one that doesn't match one
of our regexes). Previously, we would default to Mesos, fail, and then start the shell
anyway, except that any Spark command would fail. Simply exiting seems clearer.
2013-10-18 20:24:10 -07:00
Mosharaf Chowdhury 08391dbcb8 Should compile now. 2013-10-17 23:06:17 -07:00
Mosharaf Chowdhury 8612641362 Added an after block to reset spark.broadcast.factory 2013-10-17 22:44:04 -07:00
Aaron Davidson 74737264c4 Spark shell exits if it cannot create SparkContext
Mainly, this occurs if you provide a messed up MASTER url (one that doesn't match one
of our regexes). Previously, we would default to Mesos, fail, and then start the shell
anyway, except that any Spark command would fail.
2013-10-17 18:51:19 -07:00
Mosharaf Chowdhury 90ab55fd37 Merge remote-tracking branch 'upstream/master' 2013-10-17 18:12:28 -07:00
Mosharaf Chowdhury e178ae4e9b BroadcastSuite updated to test both HttpBroadcast and TorrentBroadcast in local, local[N], local-cluster settings. 2013-10-17 16:38:43 -07:00
Matei Zaharia fc26e5b832 Merge pull request #69 from KarthikTunga/master
Fix for issue SPARK-627. Implementing --config argument in the scripts.

This code fix is for issue SPARK-627. I added code to consider --config arguments in the scripts. In case the  <conf-dir> is not a directory the scripts exit. I removed the --hosts argument. It can be achieved by giving a different config directory. Let me know if an explicit --hosts argument is required.
2013-10-17 13:21:07 -07:00
Mosharaf Chowdhury 6a84e40efe Merge remote-tracking branch 'upstream/master' 2013-10-17 13:14:33 -07:00
Mosharaf Chowdhury 35b2415fb3 Code styling. Updated doc. 2013-10-17 13:14:12 -07:00
Matei Zaharia cf64f63f8a Merge pull request #67 from kayousterhout/remove_tsl
Removed TaskSchedulerListener interface.

The interface was used only by the DAG scheduler (so it wasn't necessary
to define the additional interface), and the naming makes it very
confusing when reading the code (because "listener" was used
to describe the DAG scheduler, rather than SparkListeners, which
implement a nearly-identical interface but serve a different
function).

@mateiz - is there a reason for this interface that I'm missing?
2013-10-17 11:12:28 -07:00
Mosharaf Chowdhury e663750488 Removed unused code.
Changes to match Spark coding style.
2013-10-17 00:19:50 -07:00
Kay Ousterhout 809f547633 Fixed unit tests 2013-10-16 23:16:12 -07:00
KarthikTunga 8537f19268 SPARK-627 , Implementing --config arguments in the scripts 2013-10-16 23:00:33 -07:00
KarthikTunga ff4fb1f7ee SPARK-627 , Implementing --config arguments in the scripts 2013-10-16 22:55:15 -07:00
KarthikTunga a32aa6b351 Implementing --config argument in the scripts 2013-10-16 22:51:09 -07:00
Mosharaf Chowdhury e96bd0068f BroadcastTest2 --> BroadcastTest 2013-10-16 21:33:33 -07:00
Mosharaf Chowdhury a8d0981832 Fixes for the new BlockId naming convention. 2013-10-16 21:33:33 -07:00
Mosharaf Chowdhury feb45d391f Default blockSize is 4MB.
BroadcastTest2 example added for testing broadcasts.
2013-10-16 21:33:33 -07:00
Mosharaf Chowdhury 6e5a60fab4 Removed unnecessary code, and added comment of memory-latency tradeoff. 2013-10-16 21:33:33 -07:00
Mosharaf Chowdhury 4602e2bf6e Torrent-ish broadcast based on BlockManager. 2013-10-16 21:33:33 -07:00
Shivaram Venkataraman 0a4b76fcc2 Rename SBT target to assemble-deps. 2013-10-16 17:05:46 -07:00