The Spark shuffle phase can produce a large number of files, as one file is created
per mapper per reducer. For large or repeated jobs, this often produces millions of
shuffle files, which sees extremely degredaded performance from the OS file system.
This patch seeks to reduce that burden by combining multipe shuffle files into one.
This PR draws upon the work of Jason Dai in https://github.com/mesos/spark/pull/669.
However, it simplifies the design in order to get the majority of the gain with less
overall intellectual and code burden. The vast majority of code in this pull request
is a refactor to allow the insertion of a clean layer of indirection between logical
block ids and physical files. This, I feel, provides some design clarity in addition
to enabling shuffle file consolidation.
The main goal is to produce one shuffle file per reducer per active mapper thread.
This allows us to isolate the mappers (simplifying the failure modes), while still
allowing us to reduce the number of mappers tremendously for large tasks. In order
to accomplish this, we simply create a new set of shuffle files for every parallel
task, and return the files to a pool which will be given out to the next run task.
The main goal of this refactor was to allow the interposition of a new layer which
maps logical BlockIds to physical locations other than a file with the same name
as the BlockId. In particular, BlockIds will need to be mappable to chunks of files,
as multiple will be stored in the same file.
In order to accomplish this, the following changes have been made:
- Creation of DiskBlockManager, which manages the association of logical BlockIds
to physical disk locations (called FileSegments). By default, Blocks are simply
mapped to physical files of the same name, as before.
- The DiskStore now indirects all requests for a given BlockId through the DiskBlockManager
in order to resolve the actual File location.
- DiskBlockObjectWriter has been merged into BlockObjectWriter.
- The Netty PathResolver has been changed to map BlockIds into FileSegments, as this
codepath is the only one that uses Netty, and that is likely to remain the case.
Overall, I think this refactor produces a clearer division between the logical Block
paradigm and their physical on-disk location. There is now an explicit (and documented)
mapping from one to the other.
Add an add() method to pyspark accumulators.
Add a regular method for adding a term to accumulators in
pyspark. Currently if you have a non-global accumulator, adding to it
is awkward. The += operator can't be used for non-global accumulators
captured via closure because it's involves an assignment. The only way
to do it is using __iadd__ directly.
Adding this method lets you write code like this:
def main():
sc = SparkContext()
accum = sc.accumulator(0)
rdd = sc.parallelize([1,2,3])
def f(x):
accum.add(x)
rdd.foreach(f)
print accum.value
where using accum += x instead would have caused UnboundLocalError
exceptions in workers. Currently it would have to be written as
accum.__iadd__(x).
Previously, MapOutputTracker contained fields and methods that
were only applicable to the master or worker instances. This
commit introduces a MasterMapOutputTracker class to prevent
the master-specific methods from being accessed on workers.
I also renamed a few methods and made others protected/private.
Add a regular method for adding a term to accumulators in
pyspark. Currently if you have a non-global accumulator, adding to it
is awkward. The += operator can't be used for non-global accumulators
captured via closure because it's involves an assignment. The only way
to do it is using __iadd__ directly.
Adding this method lets you write code like this:
def main():
sc = SparkContext()
accum = sc.accumulator(0)
rdd = sc.parallelize([1,2,3])
def f(x):
accum.add(x)
rdd.foreach(f)
print accum.value
where using accum += x instead would have caused UnboundLocalError
exceptions in workers. Currently it would have to be written as
accum.__iadd__(x).
Job cancellation via job group id.
This PR adds a simple API to group together a set of jobs belonging to a thread and threads spawned from it. It also allows the cancellation of all jobs in this group.
An example:
sc.setJobDescription("this_is_the_group_id", "some job description")
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
In a separate thread:
sc.cancelJobGroup("this_is_the_group_id")
Add SBT target to assemble dependencies
This pull request is an attempt to address the long assembly build times during development. Instead of rebuilding the assembly jar for every Spark change, this pull request adds a new SBT target `spark` that packages all the Spark modules and builds an assembly of the dependencies.
So the work flow that should work now would be something like
```
./sbt/sbt spark # Doing this once should suffice
## Make changes
./sbt/sbt compile
./sbt/sbt test or ./spark-shell
```
Faster and stable/reliable broadcast
HttpBroadcast is noticeably slow, but the alternatives (TreeBroadcast or BitTorrentBroadcast) are notoriously unreliable. The main problem with them is they try to manage the memory for the pieces of a broadcast themselves. Right now, the BroadcastManager does not know which machines the tasks reading from a broadcast variable is running and when they have finished. Consequently, we try to guess and often guess wrong, which blows up the memory usage and kills/hangs jobs.
This very simple implementation solves the problem by not trying to manage the intermediate pieces; instead, it offloads that duty to the BlockManager which is quite good at juggling blocks. Otherwise, it is very similar to the BitTorrentBroadcast implementation (without fancy optimizations). And it runs much faster than HttpBroadcast we have right now.
I've been using this for another project for last couple of weeks, and just today did some benchmarking against the Http one. The following shows the improvements for increasing broadcast size for cold runs. Each line represent the number of receivers.
![fix-bc-first](https://f.cloud.github.com/assets/232966/1349342/ffa149e4-36e7-11e3-9fa6-c74555829356.png)
After the first broadcast is over, i.e., after JVM is wormed up and for HttpBroadcast the server is already running (I think), the following are the improvements for warm runs.
![fix-bc-succ](https://f.cloud.github.com/assets/232966/1349352/5a948bae-36e8-11e3-98ce-34f19ebd33e0.jpg)
The curves are not as nice as the cold runs, but the improvements are obvious, specially for larger broadcasts and more receivers.
Depending on how it goes, we should deprecate and/or remove old TreeBroadcast and BitTorrentBroadcast implementations, and hopefully, SPARK-889 will not be necessary any more.
Spark shell exits if it cannot create SparkContext
Mainly, this occurs if you provide a messed up MASTER url (one that doesn't match one
of our regexes). Previously, we would default to Mesos, fail, and then start the shell
anyway, except that any Spark command would fail. Simply exiting seems clearer.
1) This allows the index map to be optimized for Vids
2) This makes the code more readable
2) The Graph API can now return VertexSetRDDs from operations that produce results for vertices
While benchmarking, we accidentally committed some unnecessary changes
to core such as adding logging. These changes make it more difficult to
merge from Spark upstream, so this commit reverts them.
1) Further simplification of the IndexedRDD operations (eliminating some)
2) Aggressive reuse of HashMaps
3) Pipelining join operations within indexedrdd
Mainly, this occurs if you provide a messed up MASTER url (one that doesn't match one
of our regexes). Previously, we would default to Mesos, fail, and then start the shell
anyway, except that any Spark command would fail.
Fix for issue SPARK-627. Implementing --config argument in the scripts.
This code fix is for issue SPARK-627. I added code to consider --config arguments in the scripts. In case the <conf-dir> is not a directory the scripts exit. I removed the --hosts argument. It can be achieved by giving a different config directory. Let me know if an explicit --hosts argument is required.