[SPARK-32210][CORE] Fix NegativeArraySizeException in MapOutputTracker with large spark.default.parallelism
### What changes were proposed in this pull request? The current `MapOutputTracker` class may throw `NegativeArraySizeException` with a large number of partitions. Within the serializeOutputStatuses() method, it is trying to compress an array of mapStatuses and outputting the binary data into (Apache)ByteArrayOutputStream . Inside the (Apache)ByteArrayOutputStream.toByteArray(), negative index exception happens because the index is int and overflows (2GB limit) when the output binary size is too large. This PR proposes two high-level ideas: 1. Use `org.apache.spark.util.io.ChunkedByteBufferOutputStream`, which has a way to output the underlying buffer as `Array[Array[Byte]]`. 2. Change the signatures from `Array[Byte]` to `Array[Array[Byte]]` in order to handle over 2GB compressed data. ### Why are the changes needed? This issue seems to be missed out in the earlier effort of addressing 2GB limitations [SPARK-6235](https://issues.apache.org/jira/browse/SPARK-6235) Without this fix, `spark.default.parallelism` needs to be kept at the low number. The drawback of setting smaller spark.default.parallelism is that it requires more executor memory (more data per partition). Setting `spark.io.compression.zstd.level` to higher number (default 1) hardly helps. That essentially means we have the data size limit that for shuffling and does not scale. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Passed existing tests ``` build/sbt "core/testOnly org.apache.spark.MapOutputTrackerSuite" ``` Also added a new unit test ``` build/sbt "core/testOnly org.apache.spark.MapOutputTrackerSuite -- -z SPARK-32210" ``` Ran the benchmark using GitHub Actions and didn't not observe any performance penalties. The results are attached in this PR ``` core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt core/benchmarks/MapStatusesSerDeserBenchmark-results.txt ``` Closes #33721 from kazuyukitanimura/SPARK-32210. Authored-by: Kazuyuki Tanimura <ktanimura@apple.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
parent
f620996142
commit
8ee464cd7a
|
@ -1,64 +1,64 @@
|
|||
OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
|
||||
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
|
||||
OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
|
||||
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
|
||||
200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
-------------------------------------------------------------------------------------------------------------------------
|
||||
Serialization 179 194 9 1.1 897.4 1.0X
|
||||
Deserialization 254 321 74 0.8 1271.0 0.7X
|
||||
Serialization 148 164 8 1.4 739.6 1.0X
|
||||
Deserialization 202 303 72 1.0 1009.9 0.7X
|
||||
|
||||
Compressed Serialized MapStatus sizes: 409 bytes
|
||||
Compressed Serialized MapStatus sizes: 412 bytes
|
||||
Compressed Serialized Broadcast MapStatus sizes: 2 MB
|
||||
|
||||
|
||||
OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
|
||||
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
|
||||
OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
|
||||
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
|
||||
200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
--------------------------------------------------------------------------------------------------------------------------
|
||||
Serialization 160 166 7 1.2 801.2 1.0X
|
||||
Deserialization 256 323 69 0.8 1278.9 0.6X
|
||||
Serialization 125 132 9 1.6 623.4 1.0X
|
||||
Deserialization 197 277 76 1.0 984.4 0.6X
|
||||
|
||||
Compressed Serialized MapStatus sizes: 2 MB
|
||||
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
|
||||
|
||||
|
||||
OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
|
||||
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
|
||||
OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
|
||||
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
|
||||
200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
--------------------------------------------------------------------------------------------------------------------------
|
||||
Serialization 341 349 7 0.6 1707.3 1.0X
|
||||
Deserialization 286 370 84 0.7 1431.4 1.2X
|
||||
Serialization 260 286 17 0.8 1302.0 1.0X
|
||||
Deserialization 224 344 128 0.9 1121.0 1.2X
|
||||
|
||||
Compressed Serialized MapStatus sizes: 426 bytes
|
||||
Compressed Serialized MapStatus sizes: 427 bytes
|
||||
Compressed Serialized Broadcast MapStatus sizes: 13 MB
|
||||
|
||||
|
||||
OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
|
||||
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
|
||||
OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
|
||||
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
|
||||
200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
---------------------------------------------------------------------------------------------------------------------------
|
||||
Serialization 309 319 11 0.6 1543.6 1.0X
|
||||
Deserialization 286 373 117 0.7 1429.5 1.1X
|
||||
Serialization 253 272 14 0.8 1262.9 1.0X
|
||||
Deserialization 240 409 150 0.8 1201.0 1.1X
|
||||
|
||||
Compressed Serialized MapStatus sizes: 13 MB
|
||||
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
|
||||
|
||||
|
||||
OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
|
||||
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
|
||||
OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
|
||||
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
|
||||
200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
---------------------------------------------------------------------------------------------------------------------------
|
||||
Serialization 1619 1627 12 0.1 8092.6 1.0X
|
||||
Deserialization 864 883 26 0.2 4319.9 1.9X
|
||||
Serialization 1361 1378 24 0.1 6805.0 1.0X
|
||||
Deserialization 830 1022 272 0.2 4150.1 1.6X
|
||||
|
||||
Compressed Serialized MapStatus sizes: 557 bytes
|
||||
Compressed Serialized MapStatus sizes: 562 bytes
|
||||
Compressed Serialized Broadcast MapStatus sizes: 121 MB
|
||||
|
||||
|
||||
OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
|
||||
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
|
||||
OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
|
||||
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
|
||||
200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
----------------------------------------------------------------------------------------------------------------------------
|
||||
Serialization 1449 1456 9 0.1 7246.8 1.0X
|
||||
Deserialization 853 888 46 0.2 4263.7 1.7X
|
||||
Serialization 1216 1251 51 0.2 6078.3 1.0X
|
||||
Deserialization 821 968 138 0.2 4105.8 1.5X
|
||||
|
||||
Compressed Serialized MapStatus sizes: 121 MB
|
||||
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
|
||||
|
|
|
@ -1,64 +1,64 @@
|
|||
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
|
||||
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
|
||||
OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
|
||||
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
|
||||
200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
-------------------------------------------------------------------------------------------------------------------------
|
||||
Serialization 135 161 56 1.5 673.9 1.0X
|
||||
Deserialization 213 235 26 0.9 1065.6 0.6X
|
||||
Serialization 143 164 55 1.4 716.5 1.0X
|
||||
Deserialization 252 300 43 0.8 1262.4 0.6X
|
||||
|
||||
Compressed Serialized MapStatus sizes: 409 bytes
|
||||
Compressed Serialized MapStatus sizes: 412 bytes
|
||||
Compressed Serialized Broadcast MapStatus sizes: 2 MB
|
||||
|
||||
|
||||
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
|
||||
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
|
||||
OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
|
||||
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
|
||||
200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
--------------------------------------------------------------------------------------------------------------------------
|
||||
Serialization 130 137 5 1.5 650.8 1.0X
|
||||
Deserialization 211 230 20 0.9 1056.5 0.6X
|
||||
Serialization 137 139 1 1.5 684.2 1.0X
|
||||
Deserialization 252 259 13 0.8 1259.5 0.5X
|
||||
|
||||
Compressed Serialized MapStatus sizes: 2 MB
|
||||
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
|
||||
|
||||
|
||||
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
|
||||
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
|
||||
OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
|
||||
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
|
||||
200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
--------------------------------------------------------------------------------------------------------------------------
|
||||
Serialization 281 324 86 0.7 1406.7 1.0X
|
||||
Deserialization 240 267 32 0.8 1200.5 1.2X
|
||||
Serialization 279 322 116 0.7 1394.6 1.0X
|
||||
Deserialization 275 287 28 0.7 1372.7 1.0X
|
||||
|
||||
Compressed Serialized MapStatus sizes: 426 bytes
|
||||
Compressed Serialized MapStatus sizes: 427 bytes
|
||||
Compressed Serialized Broadcast MapStatus sizes: 13 MB
|
||||
|
||||
|
||||
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
|
||||
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
|
||||
OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
|
||||
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
|
||||
200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
---------------------------------------------------------------------------------------------------------------------------
|
||||
Serialization 265 273 6 0.8 1324.5 1.0X
|
||||
Deserialization 247 276 33 0.8 1236.1 1.1X
|
||||
Serialization 262 263 1 0.8 1310.3 1.0X
|
||||
Deserialization 274 288 22 0.7 1370.5 1.0X
|
||||
|
||||
Compressed Serialized MapStatus sizes: 13 MB
|
||||
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
|
||||
|
||||
|
||||
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
|
||||
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
|
||||
OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
|
||||
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
|
||||
200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
---------------------------------------------------------------------------------------------------------------------------
|
||||
Serialization 1333 1592 366 0.2 6666.0 1.0X
|
||||
Deserialization 560 585 22 0.4 2799.1 2.4X
|
||||
Serialization 1208 1208 1 0.2 6038.4 1.0X
|
||||
Deserialization 555 783 394 0.4 2774.2 2.2X
|
||||
|
||||
Compressed Serialized MapStatus sizes: 558 bytes
|
||||
Compressed Serialized MapStatus sizes: 562 bytes
|
||||
Compressed Serialized Broadcast MapStatus sizes: 121 MB
|
||||
|
||||
|
||||
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
|
||||
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
|
||||
OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
|
||||
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
|
||||
200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
----------------------------------------------------------------------------------------------------------------------------
|
||||
Serialization 1222 1260 54 0.2 6111.7 1.0X
|
||||
Deserialization 539 568 42 0.4 2695.3 2.3X
|
||||
Serialization 1097 1097 1 0.2 5484.2 1.0X
|
||||
Deserialization 554 596 48 0.4 2771.3 2.0X
|
||||
|
||||
Compressed Serialized MapStatus sizes: 121 MB
|
||||
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
|
||||
|
|
|
@ -17,7 +17,8 @@
|
|||
|
||||
package org.apache.spark
|
||||
|
||||
import java.io.{ByteArrayInputStream, IOException, ObjectInputStream, ObjectOutputStream}
|
||||
import java.io.{ByteArrayInputStream, InputStream, IOException, ObjectInputStream, ObjectOutputStream}
|
||||
import java.nio.ByteBuffer
|
||||
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock
|
||||
|
||||
|
@ -40,6 +41,7 @@ import org.apache.spark.scheduler.{MapStatus, MergeStatus, ShuffleOutputStatus}
|
|||
import org.apache.spark.shuffle.MetadataFetchFailedException
|
||||
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId, ShuffleMergedBlockId}
|
||||
import org.apache.spark.util._
|
||||
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
|
||||
|
||||
/**
|
||||
* Helper class used by the [[MapOutputTrackerMaster]] to perform bookkeeping for a single
|
||||
|
@ -121,14 +123,14 @@ private class ShuffleStatus(
|
|||
* broadcast variable in order to keep it from being garbage collected and to allow for it to be
|
||||
* explicitly destroyed later on when the ShuffleMapStage is garbage-collected.
|
||||
*/
|
||||
private[spark] var cachedSerializedBroadcast: Broadcast[Array[Byte]] = _
|
||||
private[spark] var cachedSerializedBroadcast: Broadcast[Array[Array[Byte]]] = _
|
||||
|
||||
/**
|
||||
* Similar to cachedSerializedMapStatus and cachedSerializedBroadcast, but for MergeStatus.
|
||||
*/
|
||||
private[this] var cachedSerializedMergeStatus: Array[Byte] = _
|
||||
|
||||
private[this] var cachedSerializedBroadcastMergeStatus: Broadcast[Array[Byte]] = _
|
||||
private[this] var cachedSerializedBroadcastMergeStatus: Broadcast[Array[Array[Byte]]] = _
|
||||
|
||||
/**
|
||||
* Counter tracking the number of partitions that have output. This is a performance optimization
|
||||
|
@ -1318,12 +1320,9 @@ private[spark] object MapOutputTracker extends Logging {
|
|||
broadcastManager: BroadcastManager,
|
||||
isLocal: Boolean,
|
||||
minBroadcastSize: Int,
|
||||
conf: SparkConf): (Array[Byte], Broadcast[Array[Byte]]) = {
|
||||
// Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of the standard one
|
||||
// This implementation doesn't reallocate the whole memory block but allocates
|
||||
// additional buffers. This way no buffers need to be garbage collected and
|
||||
// the contents don't have to be copied to the new buffer.
|
||||
val out = new ApacheByteArrayOutputStream()
|
||||
conf: SparkConf): (Array[Byte], Broadcast[Array[Array[Byte]]]) = {
|
||||
// ByteArrayOutputStream has the 2GB limit so use ChunkedByteBufferOutputStream instead
|
||||
val out = new ChunkedByteBufferOutputStream(1024 * 1024, ByteBuffer.allocate)
|
||||
out.write(DIRECT)
|
||||
val codec = CompressionCodec.createCodec(conf, conf.get(MAP_STATUS_COMPRESSION_CODEC))
|
||||
val objOut = new ObjectOutputStream(codec.compressedOutputStream(out))
|
||||
|
@ -1335,13 +1334,19 @@ private[spark] object MapOutputTracker extends Logging {
|
|||
} {
|
||||
objOut.close()
|
||||
}
|
||||
val arr = out.toByteArray
|
||||
if (arr.length >= minBroadcastSize) {
|
||||
val chunkedByteBuf = out.toChunkedByteBuffer
|
||||
val arrSize = out.size
|
||||
if (arrSize >= minBroadcastSize) {
|
||||
// Use broadcast instead.
|
||||
// Important arr(0) is the tag == DIRECT, ignore that while deserializing !
|
||||
// arr is a nested Array so that it can handle over 2GB serialized data
|
||||
val arr = chunkedByteBuf.getChunks().map(_.array())
|
||||
val bcast = broadcastManager.newBroadcast(arr, isLocal)
|
||||
// toByteArray creates copy, so we can reuse out
|
||||
out.reset()
|
||||
// Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of the standard one
|
||||
// This implementation doesn't reallocate the whole memory block but allocates
|
||||
// additional buffers. This way no buffers need to be garbage collected and
|
||||
// the contents don't have to be copied to the new buffer.
|
||||
val out = new ApacheByteArrayOutputStream()
|
||||
out.write(BROADCAST)
|
||||
val oos = new ObjectOutputStream(codec.compressedOutputStream(out))
|
||||
Utils.tryWithSafeFinally {
|
||||
|
@ -1350,10 +1355,10 @@ private[spark] object MapOutputTracker extends Logging {
|
|||
oos.close()
|
||||
}
|
||||
val outArr = out.toByteArray
|
||||
logInfo("Broadcast outputstatuses size = " + outArr.length + ", actual size = " + arr.length)
|
||||
logInfo("Broadcast outputstatuses size = " + outArr.length + ", actual size = " + arrSize)
|
||||
(outArr, bcast)
|
||||
} else {
|
||||
(arr, null)
|
||||
(chunkedByteBuf.toArray, null)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1362,13 +1367,12 @@ private[spark] object MapOutputTracker extends Logging {
|
|||
bytes: Array[Byte], conf: SparkConf): Array[T] = {
|
||||
assert (bytes.length > 0)
|
||||
|
||||
def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = {
|
||||
def deserializeObject(in: InputStream): AnyRef = {
|
||||
val codec = CompressionCodec.createCodec(conf, conf.get(MAP_STATUS_COMPRESSION_CODEC))
|
||||
// The ZStd codec is wrapped in a `BufferedInputStream` which avoids overhead excessive
|
||||
// of JNI call while trying to decompress small amount of data for each element
|
||||
// of `MapStatuses`
|
||||
val objIn = new ObjectInputStream(codec.compressedInputStream(
|
||||
new ByteArrayInputStream(arr, off, len)))
|
||||
val objIn = new ObjectInputStream(codec.compressedInputStream(in))
|
||||
Utils.tryWithSafeFinally {
|
||||
objIn.readObject()
|
||||
} {
|
||||
|
@ -1376,18 +1380,20 @@ private[spark] object MapOutputTracker extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
val in = new ByteArrayInputStream(bytes, 1, bytes.length - 1)
|
||||
bytes(0) match {
|
||||
case DIRECT =>
|
||||
deserializeObject(bytes, 1, bytes.length - 1).asInstanceOf[Array[T]]
|
||||
deserializeObject(in).asInstanceOf[Array[T]]
|
||||
case BROADCAST =>
|
||||
try {
|
||||
// deserialize the Broadcast, pull .value array out of it, and then deserialize that
|
||||
val bcast = deserializeObject(bytes, 1, bytes.length - 1).
|
||||
asInstanceOf[Broadcast[Array[Byte]]]
|
||||
val bcast = deserializeObject(in).asInstanceOf[Broadcast[Array[Array[Byte]]]]
|
||||
logInfo("Broadcast outputstatuses size = " + bytes.length +
|
||||
", actual size = " + bcast.value.length)
|
||||
", actual size = " + bcast.value.foldLeft(0L)(_ + _.length))
|
||||
val bcastIn = new ChunkedByteBuffer(bcast.value.map(ByteBuffer.wrap)).toInputStream()
|
||||
// Important - ignore the DIRECT tag ! Start from offset 1
|
||||
deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[T]]
|
||||
bcastIn.skip(1)
|
||||
deserializeObject(bcastIn).asInstanceOf[Array[T]]
|
||||
} catch {
|
||||
case e: IOException =>
|
||||
logWarning("Exception encountered during deserializing broadcasted" +
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.spark.internal.config._
|
|||
import org.apache.spark.internal.config.Network.{RPC_ASK_TIMEOUT, RPC_MESSAGE_MAX_SIZE}
|
||||
import org.apache.spark.internal.config.Tests.IS_TESTING
|
||||
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv}
|
||||
import org.apache.spark.scheduler.{CompressedMapStatus, MapStatus, MergeStatus}
|
||||
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus, MapStatus, MergeStatus}
|
||||
import org.apache.spark.shuffle.FetchFailedException
|
||||
import org.apache.spark.storage.{BlockManagerId, ShuffleBlockId, ShuffleMergedBlockId}
|
||||
|
||||
|
@ -664,4 +664,74 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
|
|||
tracker.stop()
|
||||
rpcEnv.shutdown()
|
||||
}
|
||||
|
||||
test("SPARK-32210: serialize mapStatuses to a nested Array and deserialize them") {
|
||||
val newConf = new SparkConf
|
||||
|
||||
// needs TorrentBroadcast so need a SparkContext
|
||||
withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { sc =>
|
||||
val tracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
|
||||
val rpcEnv = sc.env.rpcEnv
|
||||
val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, sc.getConf)
|
||||
rpcEnv.stop(tracker.trackerEndpoint)
|
||||
rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
|
||||
val shuffleId = 20
|
||||
val numMaps = 1000
|
||||
|
||||
tracker.registerShuffle(shuffleId, numMaps, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES)
|
||||
val r = new scala.util.Random(912)
|
||||
(0 until numMaps).foreach { i =>
|
||||
tracker.registerMapOutput(shuffleId, i, HighlyCompressedMapStatus(
|
||||
BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000),
|
||||
Array.fill[Long](1000)((r.nextDouble() * 1024 * 1024 * 1024).toLong), i))
|
||||
}
|
||||
|
||||
val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head
|
||||
val (serializedMapStatus, serializedBroadcast) = MapOutputTracker.serializeOutputStatuses(
|
||||
shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, 0, sc.getConf)
|
||||
assert(serializedBroadcast.value.length > 1)
|
||||
assert(serializedBroadcast.value.dropRight(1).forall(_.length == 1024 * 1024))
|
||||
|
||||
val result = MapOutputTracker.deserializeOutputStatuses(serializedMapStatus, sc.getConf)
|
||||
assert(result.length == numMaps)
|
||||
|
||||
tracker.unregisterShuffle(shuffleId)
|
||||
tracker.stop()
|
||||
}
|
||||
}
|
||||
|
||||
ignore("SPARK-32210: serialize and deserialize over 2GB compressed mapStatuses") {
|
||||
// This test requires 8GB heap memory settings
|
||||
val newConf = new SparkConf
|
||||
|
||||
// needs TorrentBroadcast so need a SparkContext
|
||||
withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { sc =>
|
||||
val tracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
|
||||
val rpcEnv = sc.env.rpcEnv
|
||||
val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, sc.getConf)
|
||||
rpcEnv.stop(tracker.trackerEndpoint)
|
||||
rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
|
||||
val shuffleId = 20
|
||||
val numMaps = 200000
|
||||
|
||||
tracker.registerShuffle(shuffleId, numMaps, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES)
|
||||
val r = new scala.util.Random(912)
|
||||
(0 until numMaps).foreach { i =>
|
||||
tracker.registerMapOutput(shuffleId, i, HighlyCompressedMapStatus(
|
||||
BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000, Some(r.nextString(1024 * 5))),
|
||||
Array.fill(10)((r.nextDouble() * 1024 * 1024 * 1024).toLong), i))
|
||||
}
|
||||
|
||||
val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head
|
||||
val (serializedMapStatus, serializedBroadcast) = MapOutputTracker.serializeOutputStatuses(
|
||||
shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, 0, sc.getConf)
|
||||
assert(serializedBroadcast.value.foldLeft(0L)(_ + _.length) > 2L * 1024 * 1024 * 1024)
|
||||
|
||||
val result = MapOutputTracker.deserializeOutputStatuses(serializedMapStatus, sc.getConf)
|
||||
assert(result.length == numMaps)
|
||||
|
||||
tracker.unregisterShuffle(shuffleId)
|
||||
tracker.stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,14 +64,14 @@ object MapStatusesSerDeserBenchmark extends BenchmarkBase {
|
|||
val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head
|
||||
|
||||
var serializedMapStatusSizes = 0
|
||||
var serializedBroadcastSizes = 0
|
||||
var serializedBroadcastSizes = 0L
|
||||
|
||||
val (serializedMapStatus, serializedBroadcast) = MapOutputTracker.serializeOutputStatuses(
|
||||
shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize,
|
||||
sc.getConf)
|
||||
serializedMapStatusSizes = serializedMapStatus.length
|
||||
if (serializedBroadcast != null) {
|
||||
serializedBroadcastSizes = serializedBroadcast.value.length
|
||||
serializedBroadcastSizes = serializedBroadcast.value.foldLeft(0L)(_ + _.length)
|
||||
}
|
||||
|
||||
benchmark.addCase("Serialization") { _ =>
|
||||
|
|
Loading…
Reference in a new issue