9149cad57d
### What changes were proposed in this pull request?
The current `MapOutputTracker` class may throw `NegativeArraySizeException` with a large number of partitions. Within the serializeOutputStatuses() method, it is trying to compress an array of mapStatuses and outputting the binary data into (Apache)ByteArrayOutputStream . Inside the (Apache)ByteArrayOutputStream.toByteArray(), negative index exception happens because the index is int and overflows (2GB limit) when the output binary size is too large.
This PR proposes two high-level ideas:
1. Use `org.apache.spark.util.io.ChunkedByteBufferOutputStream`, which has a way to output the underlying buffer as `Array[Array[Byte]]`.
2. Change the signatures from `Array[Byte]` to `Array[Array[Byte]]` in order to handle over 2GB compressed data.
### Why are the changes needed?
This issue seems to be missed out in the earlier effort of addressing 2GB limitations [SPARK-6235](https://issues.apache.org/jira/browse/SPARK-6235)
Without this fix, `spark.default.parallelism` needs to be kept at the low number. The drawback of setting smaller spark.default.parallelism is that it requires more executor memory (more data per partition). Setting `spark.io.compression.zstd.level` to higher number (default 1) hardly helps.
That essentially means we have the data size limit that for shuffling and does not scale.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Passed existing tests
```
build/sbt "core/testOnly org.apache.spark.MapOutputTrackerSuite"
```
Also added a new unit test
```
build/sbt "core/testOnly org.apache.spark.MapOutputTrackerSuite -- -z SPARK-32210"
```
Ran the benchmark using GitHub Actions and didn't not observe any performance penalties. The results are attached in this PR
```
core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
```
Closes #33721 from kazuyukitanimura/SPARK-32210.
Authored-by: Kazuyuki Tanimura <ktanimura@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 8ee464cd7a
)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
67 lines
4.2 KiB
Plaintext
67 lines
4.2 KiB
Plaintext
OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
|
|
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
|
|
200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
|
-------------------------------------------------------------------------------------------------------------------------
|
|
Serialization 148 164 8 1.4 739.6 1.0X
|
|
Deserialization 202 303 72 1.0 1009.9 0.7X
|
|
|
|
Compressed Serialized MapStatus sizes: 412 bytes
|
|
Compressed Serialized Broadcast MapStatus sizes: 2 MB
|
|
|
|
|
|
OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
|
|
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
|
|
200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
|
--------------------------------------------------------------------------------------------------------------------------
|
|
Serialization 125 132 9 1.6 623.4 1.0X
|
|
Deserialization 197 277 76 1.0 984.4 0.6X
|
|
|
|
Compressed Serialized MapStatus sizes: 2 MB
|
|
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
|
|
|
|
|
|
OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
|
|
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
|
|
200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
|
--------------------------------------------------------------------------------------------------------------------------
|
|
Serialization 260 286 17 0.8 1302.0 1.0X
|
|
Deserialization 224 344 128 0.9 1121.0 1.2X
|
|
|
|
Compressed Serialized MapStatus sizes: 427 bytes
|
|
Compressed Serialized Broadcast MapStatus sizes: 13 MB
|
|
|
|
|
|
OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
|
|
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
|
|
200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
|
---------------------------------------------------------------------------------------------------------------------------
|
|
Serialization 253 272 14 0.8 1262.9 1.0X
|
|
Deserialization 240 409 150 0.8 1201.0 1.1X
|
|
|
|
Compressed Serialized MapStatus sizes: 13 MB
|
|
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
|
|
|
|
|
|
OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
|
|
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
|
|
200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
|
---------------------------------------------------------------------------------------------------------------------------
|
|
Serialization 1361 1378 24 0.1 6805.0 1.0X
|
|
Deserialization 830 1022 272 0.2 4150.1 1.6X
|
|
|
|
Compressed Serialized MapStatus sizes: 562 bytes
|
|
Compressed Serialized Broadcast MapStatus sizes: 121 MB
|
|
|
|
|
|
OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
|
|
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
|
|
200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
|
----------------------------------------------------------------------------------------------------------------------------
|
|
Serialization 1216 1251 51 0.2 6078.3 1.0X
|
|
Deserialization 821 968 138 0.2 4105.8 1.5X
|
|
|
|
Compressed Serialized MapStatus sizes: 121 MB
|
|
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
|
|
|
|
|