Refer to the JIRA for the design doc and some perf results.
I wanted to call out some of the more possibly controversial changes up front:
* Map outputs are only stored in serialized form when Kryo is in use. I'm still unsure whether Java-serialized objects can be relocated. At the very least, Java serialization writes out a stream header which causes problems with the current approach, so I decided to leave investigating this to future work.
* The shuffle now explicitly operates on key-value pairs instead of any object. Data is written to shuffle files in alternating keys and values instead of key-value tuples. `BlockObjectWriter.write` now accepts a key argument and a value argument instead of any object.
* The map output buffer can hold a max of Integer.MAX_VALUE bytes. Though this wouldn't be terribly difficult to change.
* When spilling occurs, the objects that still in memory at merge time end up serialized and deserialized an extra time.
Author: Sandy Ryza <sandy@cloudera.com>
Closes#4450 from sryza/sandy-spark-4550 and squashes the following commits:
8c70dd9 [Sandy Ryza] Fix serialization
9c16fe6 [Sandy Ryza] Fix a couple tests and move getAutoReset to KryoSerializerInstance
6c54e06 [Sandy Ryza] Fix scalastyle
d8462d8 [Sandy Ryza] SPARK-4550
This builds on my earlier pull requests and turns on the explicit type checking in scalastyle.
Author: Reynold Xin <rxin@databricks.com>
Closes#5342 from rxin/SPARK-6428 and squashes the following commits:
7b531ab [Reynold Xin] import ordering
2d9a8a5 [Reynold Xin] jl
e668b1c [Reynold Xin] override
9b9e119 [Reynold Xin] Parenthesis.
82e0cf5 [Reynold Xin] [SPARK-6428] Turn on explicit type checking for public methods.
Before diving into review #4450 I did a look through the existing shuffle
code to learn how it works. Unfortunately, there are some very
confusing things in this code. This patch makes a few small changes
to simplify things. It is not easily to concisely describe the changes
because of how convoluted the issues were, but they are fairly small
logically:
1. There is a trait named `ShuffleBlockManager` that only deals with
one logical function which is retrieving shuffle block data given shuffle
block coordinates. This trait has two implementors FileShuffleBlockManager
and IndexShuffleBlockManager. Confusingly the vast majority of those
implementations have nothing to do with this particular functionality.
So I've renamed the trait to ShuffleBlockResolver and documented it.
2. The aforementioned trait had two almost identical methods, for no good
reason. I removed one method (getBytes) and modified callers to use the
other one. I think the behavior is preserved in all cases.
3. The sort shuffle code uses an identifier "0" in the reduce slot of a
BlockID as a placeholder. I made it into a constant since it needs to
be consistent across multiple places.
I think for (3) there is actually a better solution that would avoid the
need to do this type of workaround/hack in the first place, but it's more
complex so I'm punting it for now.
Author: Patrick Wendell <patrick@databricks.com>
Closes#5286 from pwendell/cleanup and squashes the following commits:
c71fbc7 [Patrick Wendell] Open interface back up for testing
f36edd5 [Patrick Wendell] Code review feedback
d1c0494 [Patrick Wendell] Style fix
a406079 [Patrick Wendell] [HOTFIX] Some clean-up in shuffle code.