diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt index 29699a2fdc..0481630a5d 100644 --- a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt +++ b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt @@ -1,64 +1,64 @@ -OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure +Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz 200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -Serialization 179 194 9 1.1 897.4 1.0X -Deserialization 254 321 74 0.8 1271.0 0.7X +Serialization 148 164 8 1.4 739.6 1.0X +Deserialization 202 303 72 1.0 1009.9 0.7X -Compressed Serialized MapStatus sizes: 409 bytes +Compressed Serialized MapStatus sizes: 412 bytes Compressed Serialized Broadcast MapStatus sizes: 2 MB -OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure +Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz 200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------- -Serialization 160 166 7 1.2 801.2 1.0X -Deserialization 256 323 69 0.8 1278.9 0.6X +Serialization 125 132 9 1.6 623.4 1.0X +Deserialization 197 277 76 1.0 984.4 0.6X Compressed Serialized MapStatus sizes: 2 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes -OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure +Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz 200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------- -Serialization 341 349 7 0.6 1707.3 1.0X -Deserialization 286 370 84 0.7 1431.4 1.2X +Serialization 260 286 17 0.8 1302.0 1.0X +Deserialization 224 344 128 0.9 1121.0 1.2X -Compressed Serialized MapStatus sizes: 426 bytes +Compressed Serialized MapStatus sizes: 427 bytes Compressed Serialized Broadcast MapStatus sizes: 13 MB -OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure +Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz 200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -Serialization 309 319 11 0.6 1543.6 1.0X -Deserialization 286 373 117 0.7 1429.5 1.1X +Serialization 253 272 14 0.8 1262.9 1.0X +Deserialization 240 409 150 0.8 1201.0 1.1X Compressed Serialized MapStatus sizes: 13 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes -OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure +Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz 200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -Serialization 1619 1627 12 0.1 8092.6 1.0X -Deserialization 864 883 26 0.2 4319.9 1.9X +Serialization 1361 1378 24 0.1 6805.0 1.0X +Deserialization 830 1022 272 0.2 4150.1 1.6X -Compressed Serialized MapStatus sizes: 557 bytes +Compressed Serialized MapStatus sizes: 562 bytes Compressed Serialized Broadcast MapStatus sizes: 121 MB -OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure +Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz 200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------------------------------------- -Serialization 1449 1456 9 0.1 7246.8 1.0X -Deserialization 853 888 46 0.2 4263.7 1.7X +Serialization 1216 1251 51 0.2 6078.3 1.0X +Deserialization 821 968 138 0.2 4105.8 1.5X Compressed Serialized MapStatus sizes: 121 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt index 96fa3a01a8..5b005a522c 100644 --- a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt +++ b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt @@ -1,64 +1,64 @@ -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz +OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz 200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -Serialization 135 161 56 1.5 673.9 1.0X -Deserialization 213 235 26 0.9 1065.6 0.6X +Serialization 143 164 55 1.4 716.5 1.0X +Deserialization 252 300 43 0.8 1262.4 0.6X -Compressed Serialized MapStatus sizes: 409 bytes +Compressed Serialized MapStatus sizes: 412 bytes Compressed Serialized Broadcast MapStatus sizes: 2 MB -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz +OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz 200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------- -Serialization 130 137 5 1.5 650.8 1.0X -Deserialization 211 230 20 0.9 1056.5 0.6X +Serialization 137 139 1 1.5 684.2 1.0X +Deserialization 252 259 13 0.8 1259.5 0.5X Compressed Serialized MapStatus sizes: 2 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz +OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz 200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------- -Serialization 281 324 86 0.7 1406.7 1.0X -Deserialization 240 267 32 0.8 1200.5 1.2X +Serialization 279 322 116 0.7 1394.6 1.0X +Deserialization 275 287 28 0.7 1372.7 1.0X -Compressed Serialized MapStatus sizes: 426 bytes +Compressed Serialized MapStatus sizes: 427 bytes Compressed Serialized Broadcast MapStatus sizes: 13 MB -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz +OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz 200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -Serialization 265 273 6 0.8 1324.5 1.0X -Deserialization 247 276 33 0.8 1236.1 1.1X +Serialization 262 263 1 0.8 1310.3 1.0X +Deserialization 274 288 22 0.7 1370.5 1.0X Compressed Serialized MapStatus sizes: 13 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz +OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz 200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -Serialization 1333 1592 366 0.2 6666.0 1.0X -Deserialization 560 585 22 0.4 2799.1 2.4X +Serialization 1208 1208 1 0.2 6038.4 1.0X +Deserialization 555 783 394 0.4 2774.2 2.2X -Compressed Serialized MapStatus sizes: 558 bytes +Compressed Serialized MapStatus sizes: 562 bytes Compressed Serialized Broadcast MapStatus sizes: 121 MB -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz +OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz 200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------------------------------------- -Serialization 1222 1260 54 0.2 6111.7 1.0X -Deserialization 539 568 42 0.4 2695.3 2.3X +Serialization 1097 1097 1 0.2 5484.2 1.0X +Deserialization 554 596 48 0.4 2771.3 2.0X Compressed Serialized MapStatus sizes: 121 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 1b25ec5044..24954e7674 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -17,7 +17,8 @@ package org.apache.spark -import java.io.{ByteArrayInputStream, IOException, ObjectInputStream, ObjectOutputStream} +import java.io.{ByteArrayInputStream, InputStream, IOException, ObjectInputStream, ObjectOutputStream} +import java.nio.ByteBuffer import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} import java.util.concurrent.locks.ReentrantReadWriteLock @@ -40,6 +41,7 @@ import org.apache.spark.scheduler.{MapStatus, MergeStatus, ShuffleOutputStatus} import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId, ShuffleMergedBlockId} import org.apache.spark.util._ +import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} /** * Helper class used by the [[MapOutputTrackerMaster]] to perform bookkeeping for a single @@ -121,14 +123,14 @@ private class ShuffleStatus( * broadcast variable in order to keep it from being garbage collected and to allow for it to be * explicitly destroyed later on when the ShuffleMapStage is garbage-collected. */ - private[spark] var cachedSerializedBroadcast: Broadcast[Array[Byte]] = _ + private[spark] var cachedSerializedBroadcast: Broadcast[Array[Array[Byte]]] = _ /** * Similar to cachedSerializedMapStatus and cachedSerializedBroadcast, but for MergeStatus. */ private[this] var cachedSerializedMergeStatus: Array[Byte] = _ - private[this] var cachedSerializedBroadcastMergeStatus: Broadcast[Array[Byte]] = _ + private[this] var cachedSerializedBroadcastMergeStatus: Broadcast[Array[Array[Byte]]] = _ /** * Counter tracking the number of partitions that have output. This is a performance optimization @@ -1318,12 +1320,9 @@ private[spark] object MapOutputTracker extends Logging { broadcastManager: BroadcastManager, isLocal: Boolean, minBroadcastSize: Int, - conf: SparkConf): (Array[Byte], Broadcast[Array[Byte]]) = { - // Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of the standard one - // This implementation doesn't reallocate the whole memory block but allocates - // additional buffers. This way no buffers need to be garbage collected and - // the contents don't have to be copied to the new buffer. - val out = new ApacheByteArrayOutputStream() + conf: SparkConf): (Array[Byte], Broadcast[Array[Array[Byte]]]) = { + // ByteArrayOutputStream has the 2GB limit so use ChunkedByteBufferOutputStream instead + val out = new ChunkedByteBufferOutputStream(1024 * 1024, ByteBuffer.allocate) out.write(DIRECT) val codec = CompressionCodec.createCodec(conf, conf.get(MAP_STATUS_COMPRESSION_CODEC)) val objOut = new ObjectOutputStream(codec.compressedOutputStream(out)) @@ -1335,13 +1334,19 @@ private[spark] object MapOutputTracker extends Logging { } { objOut.close() } - val arr = out.toByteArray - if (arr.length >= minBroadcastSize) { + val chunkedByteBuf = out.toChunkedByteBuffer + val arrSize = out.size + if (arrSize >= minBroadcastSize) { // Use broadcast instead. // Important arr(0) is the tag == DIRECT, ignore that while deserializing ! + // arr is a nested Array so that it can handle over 2GB serialized data + val arr = chunkedByteBuf.getChunks().map(_.array()) val bcast = broadcastManager.newBroadcast(arr, isLocal) - // toByteArray creates copy, so we can reuse out - out.reset() + // Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of the standard one + // This implementation doesn't reallocate the whole memory block but allocates + // additional buffers. This way no buffers need to be garbage collected and + // the contents don't have to be copied to the new buffer. + val out = new ApacheByteArrayOutputStream() out.write(BROADCAST) val oos = new ObjectOutputStream(codec.compressedOutputStream(out)) Utils.tryWithSafeFinally { @@ -1350,10 +1355,10 @@ private[spark] object MapOutputTracker extends Logging { oos.close() } val outArr = out.toByteArray - logInfo("Broadcast outputstatuses size = " + outArr.length + ", actual size = " + arr.length) + logInfo("Broadcast outputstatuses size = " + outArr.length + ", actual size = " + arrSize) (outArr, bcast) } else { - (arr, null) + (chunkedByteBuf.toArray, null) } } @@ -1362,13 +1367,12 @@ private[spark] object MapOutputTracker extends Logging { bytes: Array[Byte], conf: SparkConf): Array[T] = { assert (bytes.length > 0) - def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = { + def deserializeObject(in: InputStream): AnyRef = { val codec = CompressionCodec.createCodec(conf, conf.get(MAP_STATUS_COMPRESSION_CODEC)) // The ZStd codec is wrapped in a `BufferedInputStream` which avoids overhead excessive // of JNI call while trying to decompress small amount of data for each element // of `MapStatuses` - val objIn = new ObjectInputStream(codec.compressedInputStream( - new ByteArrayInputStream(arr, off, len))) + val objIn = new ObjectInputStream(codec.compressedInputStream(in)) Utils.tryWithSafeFinally { objIn.readObject() } { @@ -1376,18 +1380,20 @@ private[spark] object MapOutputTracker extends Logging { } } + val in = new ByteArrayInputStream(bytes, 1, bytes.length - 1) bytes(0) match { case DIRECT => - deserializeObject(bytes, 1, bytes.length - 1).asInstanceOf[Array[T]] + deserializeObject(in).asInstanceOf[Array[T]] case BROADCAST => try { // deserialize the Broadcast, pull .value array out of it, and then deserialize that - val bcast = deserializeObject(bytes, 1, bytes.length - 1). - asInstanceOf[Broadcast[Array[Byte]]] + val bcast = deserializeObject(in).asInstanceOf[Broadcast[Array[Array[Byte]]]] logInfo("Broadcast outputstatuses size = " + bytes.length + - ", actual size = " + bcast.value.length) + ", actual size = " + bcast.value.foldLeft(0L)(_ + _.length)) + val bcastIn = new ChunkedByteBuffer(bcast.value.map(ByteBuffer.wrap)).toInputStream() // Important - ignore the DIRECT tag ! Start from offset 1 - deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[T]] + bcastIn.skip(1) + deserializeObject(bcastIn).asInstanceOf[Array[T]] } catch { case e: IOException => logWarning("Exception encountered during deserializing broadcasted" + diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 69cc8c1bce..e81196f8ea 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Network.{RPC_ASK_TIMEOUT, RPC_MESSAGE_MAX_SIZE} import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv} -import org.apache.spark.scheduler.{CompressedMapStatus, MapStatus, MergeStatus} +import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus, MapStatus, MergeStatus} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockManagerId, ShuffleBlockId, ShuffleMergedBlockId} @@ -664,4 +664,74 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { tracker.stop() rpcEnv.shutdown() } + + test("SPARK-32210: serialize mapStatuses to a nested Array and deserialize them") { + val newConf = new SparkConf + + // needs TorrentBroadcast so need a SparkContext + withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { sc => + val tracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + val rpcEnv = sc.env.rpcEnv + val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, sc.getConf) + rpcEnv.stop(tracker.trackerEndpoint) + rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) + val shuffleId = 20 + val numMaps = 1000 + + tracker.registerShuffle(shuffleId, numMaps, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES) + val r = new scala.util.Random(912) + (0 until numMaps).foreach { i => + tracker.registerMapOutput(shuffleId, i, HighlyCompressedMapStatus( + BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000), + Array.fill[Long](1000)((r.nextDouble() * 1024 * 1024 * 1024).toLong), i)) + } + + val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head + val (serializedMapStatus, serializedBroadcast) = MapOutputTracker.serializeOutputStatuses( + shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, 0, sc.getConf) + assert(serializedBroadcast.value.length > 1) + assert(serializedBroadcast.value.dropRight(1).forall(_.length == 1024 * 1024)) + + val result = MapOutputTracker.deserializeOutputStatuses(serializedMapStatus, sc.getConf) + assert(result.length == numMaps) + + tracker.unregisterShuffle(shuffleId) + tracker.stop() + } + } + + ignore("SPARK-32210: serialize and deserialize over 2GB compressed mapStatuses") { + // This test requires 8GB heap memory settings + val newConf = new SparkConf + + // needs TorrentBroadcast so need a SparkContext + withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { sc => + val tracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + val rpcEnv = sc.env.rpcEnv + val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, sc.getConf) + rpcEnv.stop(tracker.trackerEndpoint) + rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) + val shuffleId = 20 + val numMaps = 200000 + + tracker.registerShuffle(shuffleId, numMaps, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES) + val r = new scala.util.Random(912) + (0 until numMaps).foreach { i => + tracker.registerMapOutput(shuffleId, i, HighlyCompressedMapStatus( + BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000, Some(r.nextString(1024 * 5))), + Array.fill(10)((r.nextDouble() * 1024 * 1024 * 1024).toLong), i)) + } + + val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head + val (serializedMapStatus, serializedBroadcast) = MapOutputTracker.serializeOutputStatuses( + shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, 0, sc.getConf) + assert(serializedBroadcast.value.foldLeft(0L)(_ + _.length) > 2L * 1024 * 1024 * 1024) + + val result = MapOutputTracker.deserializeOutputStatuses(serializedMapStatus, sc.getConf) + assert(result.length == numMaps) + + tracker.unregisterShuffle(shuffleId) + tracker.stop() + } + } } diff --git a/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala b/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala index d808823987..bb627bb181 100644 --- a/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala @@ -64,14 +64,14 @@ object MapStatusesSerDeserBenchmark extends BenchmarkBase { val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head var serializedMapStatusSizes = 0 - var serializedBroadcastSizes = 0 + var serializedBroadcastSizes = 0L val (serializedMapStatus, serializedBroadcast) = MapOutputTracker.serializeOutputStatuses( shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize, sc.getConf) serializedMapStatusSizes = serializedMapStatus.length if (serializedBroadcast != null) { - serializedBroadcastSizes = serializedBroadcast.value.length + serializedBroadcastSizes = serializedBroadcast.value.foldLeft(0L)(_ + _.length) } benchmark.addCase("Serialization") { _ =>