Commit graph

54 commits

Author SHA1 Message Date
Patrick Wendell ab713af564 Preparing development version 1.4.0-SNAPSHOT 2015-06-02 18:06:41 -07:00
Patrick Wendell 22596c534a Preparing Spark release v1.4.0-rc4 2015-06-02 18:06:35 -07:00
Patrick Wendell e3c35b217c Preparing development version 1.4.0-SNAPSHOT 2015-06-02 17:01:15 -07:00
Patrick Wendell a14fad11ef Preparing Spark release v1.4.0-rc4 2015-06-02 17:01:10 -07:00
Patrick Wendell 92ccc5ba39 Preparing development version 1.4.0-SNAPSHOT 2015-06-02 14:02:19 -07:00
Patrick Wendell d630f4d697 Preparing Spark release v1.4.0-rc4 2015-06-02 14:02:14 -07:00
Patrick Wendell 92a677891c Preparing development version 1.4.0-SNAPSHOT 2015-06-02 08:41:15 -07:00
Patrick Wendell 48c506724a Preparing Spark release v1.4.0-rc4 2015-06-02 08:41:10 -07:00
Patrick Wendell e549874c33 Preparing development version 1.4.0-SNAPSHOT 2015-05-29 13:07:07 -07:00
Patrick Wendell dd109a8746 Preparing Spark release v1.4.0-rc3 2015-05-29 13:06:59 -07:00
Patrick Wendell c68abaa34e Preparing development version 1.4.0-SNAPSHOT 2015-05-29 12:15:18 -07:00
Patrick Wendell fb60503ff2 Preparing Spark release v1.4.0-rc3 2015-05-29 12:15:13 -07:00
Patrick Wendell 6bf5a42084 Preparing development version 1.4.0-SNAPSHOT 2015-05-28 23:40:27 -07:00
Patrick Wendell f2796816be Preparing Spark release v1.4.0-rc3 2015-05-28 23:40:22 -07:00
Patrick Wendell 119c93af9c Preparing development version 1.4.0-SNAPSHOT 2015-05-28 22:57:31 -07:00
Patrick Wendell 2d97d7a0aa Preparing Spark release v1.4.0-rc3 2015-05-28 22:57:26 -07:00
Patrick Wendell 7c342bdd93 Preparing development version 1.4.0-SNAPSHOT 2015-05-27 22:36:30 -07:00
Patrick Wendell 4983dfc878 Preparing Spark release v1.4.0-rc3 2015-05-27 22:36:23 -07:00
Patrick Wendell 947d700ec8 Preparing development version 1.4.0-SNAPSHOT 2015-05-23 20:13:05 -07:00
Patrick Wendell 03fb26a3e5 Preparing Spark release v1.4.0-rc2 2015-05-23 20:13:00 -07:00
Patrick Wendell f2f74b9b1a Preparing development version 1.4.1-SNAPSHOT 2015-05-23 14:59:37 -07:00
Patrick Wendell 0da7396990 Preparing Spark release v1.4.0-rc2-test 2015-05-23 14:59:31 -07:00
Patrick Wendell 8da8caab17 Preparing development version 1.4.1-SNAPSHOT 2015-05-23 14:46:27 -07:00
Patrick Wendell 8f50218f38 Preparing Spark release 1.4.0-rc2-test 2015-05-23 14:46:23 -07:00
Liang-Chi Hsieh e4489c36df [SPARK-7800] isDefined should not marked too early in putNewKey
JIRA: https://issues.apache.org/jira/browse/SPARK-7800

`isDefined` is marked as true twice in `Location.putNewKey`. The first one is unnecessary and will cause problem because it is too early and before some assert checking. E.g., if an attempt with incorrect `keyLengthBytes` marks `isDefined` as true, the location can not be used later.

ping JoshRosen

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #6324 from viirya/dup_isdefined and squashes the following commits:

cbfe03b [Liang-Chi Hsieh] isDefined should not marked too early in putNewKey.

(cherry picked from commit 5a3c04bb92)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
2015-05-21 15:21:23 -07:00
Patrick Wendell 9b37e32c55 Preparing development version 1.4.0-SNAPSHOT 2015-05-20 17:29:00 -07:00
Patrick Wendell 1e458e3553 Preparing Spark release rc-test 2015-05-20 17:28:55 -07:00
pwendell 8d66849862 Preparing development version 1.4.0-SNAPSHOT 2015-05-20 17:26:15 -07:00
pwendell ae29aeaf8e Preparing Spark release rc-test 2015-05-20 17:26:10 -07:00
jenkins 534c787b9f Preparing development version 1.4.0-SNAPSHOT 2015-05-20 16:49:59 -07:00
jenkins 5f4d87f608 Preparing Spark release rc-test 2015-05-20 16:49:54 -07:00
Josh Rosen 82bc518cf8 [SPARK-7251] Perform sequential scan when iterating over BytesToBytesMap
This patch modifies `BytesToBytesMap.iterator()` to iterate through records in the order that they appear in the data pages rather than iterating through the hashtable pointer arrays. This results in fewer random memory accesses, significantly improving performance for scan-and-copy operations.

This is possible because our data pages are laid out as sequences of `[keyLength][data][valueLength][data]` entries.  In order to mark the end of a partially-filled data page, we write `-1` as a special end-of-page length (BytesToByesMap supports empty/zero-length keys and values, which is why we had to use a negative length).

This patch incorporates / closes #5836.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #6159 from JoshRosen/SPARK-7251 and squashes the following commits:

05bd90a [Josh Rosen] Compare capacity, not size, to MAX_CAPACITY
2a20d71 [Josh Rosen] Fix maximum BytesToBytesMap capacity
bc4854b [Josh Rosen] Guard against overflow when growing BytesToBytesMap
f5feadf [Josh Rosen] Add test for iterating over an empty map
273b842 [Josh Rosen] [SPARK-7251] Perform sequential scan when iterating over entries in BytesToBytesMap

(cherry picked from commit f2faa7af30)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
2015-05-20 16:43:09 -07:00
Josh Rosen 7cea552e1e [SPARK-7698] Cache and reuse buffers in ExecutorMemoryAllocator when using heap allocation
When on-heap memory allocation is used, ExecutorMemoryManager should maintain a cache / pool of buffers for re-use by tasks. This will significantly improve the performance of the new Tungsten's sort-shuffle for jobs with many short-lived tasks by eliminating a major source of GC.

This pull request is a minimum-viable-implementation of this idea.  In its current form, this patch significantly improves performance on a stress test which launches huge numbers of short-lived shuffle map tasks back-to-back in the same JVM.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #6227 from JoshRosen/SPARK-7698 and squashes the following commits:

fd6cb55 [Josh Rosen] SoftReference -> WeakReference
b154e86 [Josh Rosen] WIP sketch of pooling in ExecutorMemoryManager

(cherry picked from commit 7956dd7ab0)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
2015-05-20 16:39:36 -07:00
Patrick Wendell 205ed15f29 Preparing development version 1.4.0-SNAPSHOT 2015-05-20 16:30:01 -07:00
Patrick Wendell 09a1c6231e Preparing Spark release rc-test 2015-05-20 16:29:52 -07:00
Patrick Wendell ac3197e1b9 Preparing development version 1.4.1-SNAPSHOT 2015-05-19 09:35:12 +00:00
Patrick Wendell 777a08166f Preparing Spark release v1.4.0-rc1 2015-05-19 09:35:12 +00:00
Patrick Wendell 586ede6b32 Revert "Preparing Spark release v1.4.0-rc1"
This reverts commit 79fb01a3be.
2015-05-19 02:27:14 -07:00
Patrick Wendell e7309ec729 Revert "Preparing development version 1.4.1-SNAPSHOT"
This reverts commit a1d896b85b.
2015-05-19 02:27:07 -07:00
Patrick Wendell a1d896b85b Preparing development version 1.4.1-SNAPSHOT 2015-05-19 07:13:24 +00:00
Patrick Wendell 79fb01a3be Preparing Spark release v1.4.0-rc1 2015-05-19 07:13:24 +00:00
Patrick Wendell b0c63d2413 Revert "Preparing Spark release v1.4.0-rc1"
This reverts commit 38ccef36c1.
2015-05-19 00:10:39 -07:00
Patrick Wendell 198a186ad3 Revert "Preparing development version 1.4.1-SNAPSHOT"
This reverts commit 40190ce226.
2015-05-19 00:10:37 -07:00
Patrick Wendell 40190ce226 Preparing development version 1.4.1-SNAPSHOT 2015-05-19 06:06:41 +00:00
Patrick Wendell 38ccef36c1 Preparing Spark release v1.4.0-rc1 2015-05-19 06:06:40 +00:00
Patrick Wendell 152b0291c0 Revert "Preparing Spark release v1.4.0-rc1"
This reverts commit e8e97e3a63.
2015-05-18 23:06:15 -07:00
Patrick Wendell 4d098bc049 Revert "Preparing development version 1.4.1-SNAPSHOT"
This reverts commit 758ca74bab.
2015-05-18 23:06:13 -07:00
Patrick Wendell 758ca74bab Preparing development version 1.4.1-SNAPSHOT 2015-05-19 05:01:11 +00:00
Patrick Wendell e8e97e3a63 Preparing Spark release v1.4.0-rc1 2015-05-19 05:01:11 +00:00
Josh Rosen c53ebea9db [SPARK-7081] Faster sort-based shuffle path using binary processing cache-aware sort
This patch introduces a new shuffle manager that enhances the existing sort-based shuffle with a new cache-friendly sort algorithm that operates directly on binary data. The goals of this patch are to lower memory usage and Java object overheads during shuffle and to speed up sorting. It also lays groundwork for follow-up patches that will enable end-to-end processing of serialized records.

The new shuffle manager, `UnsafeShuffleManager`, can be enabled by setting `spark.shuffle.manager=tungsten-sort` in SparkConf.

The new shuffle manager uses directly-managed memory to implement several performance optimizations for certain types of shuffles. In cases where the new performance optimizations cannot be applied, the new shuffle manager delegates to SortShuffleManager to handle those shuffles.

UnsafeShuffleManager's optimizations will apply when _all_ of the following conditions hold:

 - The shuffle dependency specifies no aggregation or output ordering.
 - The shuffle serializer supports relocation of serialized values (this is currently supported
   by KryoSerializer and Spark SQL's custom serializers).
 - The shuffle produces fewer than 16777216 output partitions.
 - No individual record is larger than 128 MB when serialized.

In addition, extra spill-merging optimizations are automatically applied when the shuffle compression codec supports concatenation of serialized streams. This is currently supported by Spark's LZF serializer.

At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager.  In sort-based shuffle, incoming records are sorted according to their target partition ids, then written to a single map output file. Reducers fetch contiguous regions of this file in order to read their portion of the map output. In cases where the map output data is too large to fit in memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged to produce the final output file.

UnsafeShuffleManager optimizes this process in several ways:

 - Its sort operates on serialized binary data rather than Java objects, which reduces memory consumption and GC overheads. This optimization requires the record serializer to have certain properties to allow serialized records to be re-ordered without requiring deserialization.  See SPARK-4550, where this optimization was first proposed and implemented, for more details.

 - It uses a specialized cache-efficient sorter (UnsafeShuffleExternalSorter) that sorts arrays of compressed record pointers and partition ids. By using only 8 bytes of space per record in the sorting array, this fits more of the array into cache.

 - The spill merging procedure operates on blocks of serialized records that belong to the same partition and does not need to deserialize records during the merge.

 - When the spill compression codec supports concatenation of compressed data, the spill merge simply concatenates the serialized and compressed spill partitions to produce the final output partition.  This allows efficient data copying methods, like NIO's `transferTo`, to be used and avoids the need to allocate decompression or copying buffers during the merge.

The shuffle read path is unchanged.

This patch is similar to [SPARK-4550](http://issues.apache.org/jira/browse/SPARK-4550) / #4450 but uses a slightly different implementation. The `unsafe`-based implementation featured in this patch lays the groundwork for followup patches that will enable sorting to operate on serialized data pages that will be prepared by Spark SQL's new `unsafe` operators (such as the new aggregation operator introduced in #5725).

### Future work

There are several tasks that build upon this patch, which will be left to future work:

- [SPARK-7271](https://issues.apache.org/jira/browse/SPARK-7271) Redesign / extend the shuffle interfaces to accept binary data as input. The goal here is to let us bypass serialization steps in cases where the sort input is produced by an operator that operates directly on binary data.
- Extension / redesign of the `Serializer` API. We can add new methods which allow serializers to determine the size requirements for serializing objects and for serializing objects directly to a specified memory address (similar to how `UnsafeRowConverter` works in Spark SQL).

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5868)
<!-- Reviewable:end -->

Author: Josh Rosen <joshrosen@databricks.com>

Closes #5868 from JoshRosen/unsafe-sort and squashes the following commits:

ef0a86e [Josh Rosen] Fix scalastyle errors
7610f2f [Josh Rosen] Add tests for proper cleanup of shuffle data.
d494ffe [Josh Rosen] Fix deserialization of JavaSerializer instances.
52a9981 [Josh Rosen] Fix some bugs in the address packing code.
51812a7 [Josh Rosen] Change shuffle manager sort name to tungsten-sort
4023fa4 [Josh Rosen] Add @Private annotation to some Java classes.
de40b9d [Josh Rosen] More comments to try to explain metrics code
df07699 [Josh Rosen] Attempt to clarify confusing metrics update code
5e189c6 [Josh Rosen] Track time spend closing / flushing files; split TimeTrackingOutputStream into separate file.
d5779c6 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort
c2ce78e [Josh Rosen] Fix a missed usage of MAX_PARTITION_ID
e3b8855 [Josh Rosen] Cleanup in UnsafeShuffleWriter
4a2c785 [Josh Rosen] rename 'sort buffer' to 'pointer array'
6276168 [Josh Rosen] Remove ability to disable spilling in UnsafeShuffleExternalSorter.
57312c9 [Josh Rosen] Clarify fileBufferSize units
2d4e4f4 [Josh Rosen] Address some minor comments in UnsafeShuffleExternalSorter.
fdcac08 [Josh Rosen] Guard against overflow when expanding sort buffer.
85da63f [Josh Rosen] Cleanup in UnsafeShuffleSorterIterator.
0ad34da [Josh Rosen] Fix off-by-one in nextInt() call
56781a1 [Josh Rosen] Rename UnsafeShuffleSorter to UnsafeShuffleInMemorySorter
e995d1a [Josh Rosen] Introduce MAX_SHUFFLE_OUTPUT_PARTITIONS.
e58a6b4 [Josh Rosen] Add more tests for PackedRecordPointer encoding.
4f0b770 [Josh Rosen] Attempt to implement proper shuffle write metrics.
d4e6d89 [Josh Rosen] Update to bit shifting constants
69d5899 [Josh Rosen] Remove some unnecessary override vals
8531286 [Josh Rosen] Add tests that automatically trigger spills.
7c953f9 [Josh Rosen] Add test that covers UnsafeShuffleSortDataFormat.swap().
e1855e5 [Josh Rosen] Fix a handful of misc. IntelliJ inspections
39434f9 [Josh Rosen] Avoid integer multiplication overflow in getMemoryUsage (thanks FindBugs!)
1e3ad52 [Josh Rosen] Delete unused ByteBufferOutputStream class.
ea4f85f [Josh Rosen] Roll back an unnecessary change in Spillable.
ae538dc [Josh Rosen] Document UnsafeShuffleManager.
ec6d626 [Josh Rosen] Add notes on maximum # of supported shuffle partitions.
0d4d199 [Josh Rosen] Bump up shuffle.memoryFraction to make tests pass.
b3b1924 [Josh Rosen] Properly implement close() and flush() in DummySerializerInstance.
1ef56c7 [Josh Rosen] Revise compression codec support in merger; test cross product of configurations.
b57c17f [Josh Rosen] Disable some overly-verbose logs that rendered DEBUG useless.
f780fb1 [Josh Rosen] Add test demonstrating which compression codecs support concatenation.
4a01c45 [Josh Rosen] Remove unnecessary log message
27b18b0 [Josh Rosen] That for inserting records AT the max record size.
fcd9a3c [Josh Rosen] Add notes + tests for maximum record / page sizes.
9d1ee7c [Josh Rosen] Fix MiMa excludes for ShuffleWriter change
fd4bb9e [Josh Rosen] Use own ByteBufferOutputStream rather than Kryo's
67d25ba [Josh Rosen] Update Exchange operator's copying logic to account for new shuffle manager
8f5061a [Josh Rosen] Strengthen assertion to check partitioning
01afc74 [Josh Rosen] Actually read data in UnsafeShuffleWriterSuite
1929a74 [Josh Rosen] Update to reflect upstream ShuffleBlockManager -> ShuffleBlockResolver rename.
e8718dd [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort
9b7ebed [Josh Rosen] More defensive programming RE: cleaning up spill files and memory after errors
7cd013b [Josh Rosen] Begin refactoring to enable proper tests for spilling.
722849b [Josh Rosen] Add workaround for transferTo() bug in merging code; refactor tests.
9883e30 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort
b95e642 [Josh Rosen] Refactor and document logic that decides when to spill.
1ce1300 [Josh Rosen] More minor cleanup
5e8cf75 [Josh Rosen] More minor cleanup
e67f1ea [Josh Rosen] Remove upper type bound in ShuffleWriter interface.
cfe0ec4 [Josh Rosen] Address a number of minor review comments:
8a6fe52 [Josh Rosen] Rename UnsafeShuffleSpillWriter to UnsafeShuffleExternalSorter
11feeb6 [Josh Rosen] Update TODOs related to shuffle write metrics.
b674412 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort
aaea17b [Josh Rosen] Add comments to UnsafeShuffleSpillWriter.
4f70141 [Josh Rosen] Fix merging; now passes UnsafeShuffleSuite tests.
133c8c9 [Josh Rosen] WIP towards testing UnsafeShuffleWriter.
f480fb2 [Josh Rosen] WIP in mega-refactoring towards shuffle-specific sort.
57f1ec0 [Josh Rosen] WIP towards packed record pointers for use in optimized shuffle sort.
69232fd [Josh Rosen] Enable compressible address encoding for off-heap mode.
7ee918e [Josh Rosen] Re-order imports in tests
3aeaff7 [Josh Rosen] More refactoring and cleanup; begin cleaning iterator interfaces
3490512 [Josh Rosen] Misc. cleanup
f156a8f [Josh Rosen] Hacky metrics integration; refactor some interfaces.
2776aca [Josh Rosen] First passing test for ExternalSorter.
5e100b2 [Josh Rosen] Super-messy WIP on external sort
595923a [Josh Rosen] Remove some unused variables.
8958584 [Josh Rosen] Fix bug in calculating free space in current page.
f17fa8f [Josh Rosen] Add missing newline
c2fca17 [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use:
b8a09fe [Josh Rosen] Back out accidental log4j.properties change
bfc12d3 [Josh Rosen] Add tests for serializer relocation property.
240864c [Josh Rosen] Remove PrefixComputer and require prefix to be specified as part of insert()
1433b42 [Josh Rosen] Store record length as int instead of long.
026b497 [Josh Rosen] Re-use a buffer in UnsafeShuffleWriter
0748458 [Josh Rosen] Port UnsafeShuffleWriter to Java.
87e721b [Josh Rosen] Renaming and comments
d3cc310 [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation
e2d96ca [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used.
e267cee [Josh Rosen] Fix compilation of UnsafeSorterSuite
9c6cf58 [Josh Rosen] Refactor to use DiskBlockObjectWriter.
253f13e [Josh Rosen] More cleanup
8e3ec20 [Josh Rosen] Begin code cleanup.
4d2f5e1 [Josh Rosen] WIP
3db12de [Josh Rosen] Minor simplification and sanity checks in UnsafeSorter
767d3ca [Josh Rosen] Fix invalid range in UnsafeSorter.
e900152 [Josh Rosen] Add test for empty iterator in UnsafeSorter
57a4ea0 [Josh Rosen] Make initialSize configurable in UnsafeSorter
abf7bfe [Josh Rosen] Add basic test case.
81d52c5 [Josh Rosen] WIP on UnsafeSorter

(cherry picked from commit 73bed408fb)
Signed-off-by: Reynold Xin <rxin@databricks.com>
2015-05-13 17:07:39 -07:00