[SPARK-29576][CORE] Use Spark's CompressionCodec for Ser/Deser of MapOutputStatus

### What changes were proposed in this pull request?
Instead of using ZStd codec directly, we use Spark's CompressionCodec which wraps ZStd codec in a buffered stream to avoid overhead excessive of JNI call while trying to compress/decompress small amount of data.

Also, by using Spark's CompressionCodec, we can easily to make it configurable in the future if it's needed.

### Why are the changes needed?
Faster performance.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes #26235 from dbtsai/optimizeDeser.

Lead-authored-by: DB Tsai <d_tsai@apple.com>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
DB Tsai 2019-10-23 18:17:37 -07:00 committed by Dongjoon Hyun
parent 7ecf968527
commit fd899d6331
4 changed files with 72 additions and 83 deletions

View file

@ -2,10 +2,10 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 205 213 13 1.0 1023.6 1.0X
Deserialization 908 939 27 0.2 4540.2 0.2X
Serialization 170 178 9 1.2 849.7 1.0X
Deserialization 530 535 9 0.4 2651.1 0.3X
Compressed Serialized MapStatus sizes: 400 bytes
Compressed Serialized MapStatus sizes: 411 bytes
Compressed Serialized Broadcast MapStatus sizes: 2 MB
@ -13,8 +13,8 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 195 204 24 1.0 976.9 1.0X
Deserialization 913 940 33 0.2 4566.7 0.2X
Serialization 157 165 7 1.3 785.4 1.0X
Deserialization 495 588 79 0.4 2476.7 0.3X
Compressed Serialized MapStatus sizes: 2 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
@ -24,21 +24,21 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 616 619 3 0.3 3079.1 1.0X
Deserialization 936 954 22 0.2 4680.5 0.7X
Serialization 344 351 4 0.6 1720.4 1.0X
Deserialization 527 579 99 0.4 2635.9 0.7X
Compressed Serialized MapStatus sizes: 418 bytes
Compressed Serialized Broadcast MapStatus sizes: 14 MB
Compressed Serialized MapStatus sizes: 427 bytes
Compressed Serialized Broadcast MapStatus sizes: 13 MB
OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 586 588 3 0.3 2928.8 1.0X
Deserialization 929 933 4 0.2 4647.0 0.6X
Serialization 317 321 4 0.6 1583.8 1.0X
Deserialization 530 540 15 0.4 2648.3 0.6X
Compressed Serialized MapStatus sizes: 14 MB
Compressed Serialized MapStatus sizes: 13 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
@ -46,21 +46,21 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 4740 4916 249 0.0 23698.5 1.0X
Deserialization 1578 1597 27 0.1 7890.6 3.0X
Serialization 1738 1849 156 0.1 8692.0 1.0X
Deserialization 946 977 33 0.2 4730.2 1.8X
Compressed Serialized MapStatus sizes: 546 bytes
Compressed Serialized Broadcast MapStatus sizes: 123 MB
Compressed Serialized MapStatus sizes: 556 bytes
Compressed Serialized Broadcast MapStatus sizes: 121 MB
OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 4492 4573 115 0.0 22458.3 1.0X
Deserialization 1533 1547 20 0.1 7664.8 2.9X
Serialization 1379 1432 76 0.1 6892.6 1.0X
Deserialization 929 941 19 0.2 4645.5 1.5X
Compressed Serialized MapStatus sizes: 123 MB
Compressed Serialized MapStatus sizes: 121 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes

View file

@ -2,10 +2,10 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 236 245 18 0.8 1179.1 1.0X
Deserialization 842 885 37 0.2 4211.4 0.3X
Serialization 178 187 15 1.1 887.5 1.0X
Deserialization 530 558 32 0.4 2647.5 0.3X
Compressed Serialized MapStatus sizes: 400 bytes
Compressed Serialized MapStatus sizes: 411 bytes
Compressed Serialized Broadcast MapStatus sizes: 2 MB
@ -13,8 +13,8 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 213 219 8 0.9 1065.1 1.0X
Deserialization 846 870 33 0.2 4228.6 0.3X
Serialization 167 175 7 1.2 835.7 1.0X
Deserialization 523 537 22 0.4 2616.2 0.3X
Compressed Serialized MapStatus sizes: 2 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
@ -24,21 +24,21 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 624 709 167 0.3 3121.1 1.0X
Deserialization 885 908 22 0.2 4427.0 0.7X
Serialization 351 416 147 0.6 1754.4 1.0X
Deserialization 546 551 8 0.4 2727.6 0.6X
Compressed Serialized MapStatus sizes: 418 bytes
Compressed Serialized Broadcast MapStatus sizes: 14 MB
Compressed Serialized MapStatus sizes: 427 bytes
Compressed Serialized Broadcast MapStatus sizes: 13 MB
OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 603 604 2 0.3 3014.9 1.0X
Deserialization 892 895 5 0.2 4458.7 0.7X
Serialization 320 321 1 0.6 1598.0 1.0X
Deserialization 542 549 7 0.4 2709.0 0.6X
Compressed Serialized MapStatus sizes: 14 MB
Compressed Serialized MapStatus sizes: 13 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
@ -46,21 +46,21 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 4612 4945 471 0.0 23061.0 1.0X
Deserialization 1493 1495 2 0.1 7466.3 3.1X
Serialization 1671 1877 290 0.1 8357.3 1.0X
Deserialization 943 970 32 0.2 4715.8 1.8X
Compressed Serialized MapStatus sizes: 546 bytes
Compressed Serialized Broadcast MapStatus sizes: 123 MB
Compressed Serialized MapStatus sizes: 556 bytes
Compressed Serialized Broadcast MapStatus sizes: 121 MB
OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 4452 4595 202 0.0 22261.4 1.0X
Deserialization 1464 1477 18 0.1 7321.4 3.0X
Serialization 1373 1436 89 0.1 6865.0 1.0X
Deserialization 940 970 37 0.2 4699.1 1.5X
Compressed Serialized MapStatus sizes: 123 MB
Compressed Serialized MapStatus sizes: 121 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes

View file

@ -28,13 +28,12 @@ import scala.concurrent.duration.Duration
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import com.github.luben.zstd.ZstdInputStream
import com.github.luben.zstd.ZstdOutputStream
import org.apache.commons.io.output.{ByteArrayOutputStream => ApacheByteArrayOutputStream}
import org.apache.spark.broadcast.{Broadcast, BroadcastManager}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, MapStatus}
import org.apache.spark.shuffle.MetadataFetchFailedException
@ -195,7 +194,8 @@ private class ShuffleStatus(numPartitions: Int) {
def serializedMapStatus(
broadcastManager: BroadcastManager,
isLocal: Boolean,
minBroadcastSize: Int): Array[Byte] = {
minBroadcastSize: Int,
conf: SparkConf): Array[Byte] = {
var result: Array[Byte] = null
withReadLock {
@ -207,7 +207,7 @@ private class ShuffleStatus(numPartitions: Int) {
if (result == null) withWriteLock {
if (cachedSerializedMapStatus == null) {
val serResult = MapOutputTracker.serializeMapStatuses(
mapStatuses, broadcastManager, isLocal, minBroadcastSize)
mapStatuses, broadcastManager, isLocal, minBroadcastSize, conf)
cachedSerializedMapStatus = serResult._1
cachedSerializedBroadcast = serResult._2
}
@ -450,7 +450,8 @@ private[spark] class MapOutputTrackerMaster(
" to " + hostPort)
val shuffleStatus = shuffleStatuses.get(shuffleId).head
context.reply(
shuffleStatus.serializedMapStatus(broadcastManager, isLocal, minSizeForBroadcast))
shuffleStatus.serializedMapStatus(broadcastManager, isLocal, minSizeForBroadcast,
conf))
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
@ -799,7 +800,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
endPartition: Int)
: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
val statuses = getStatuses(shuffleId)
val statuses = getStatuses(shuffleId, conf)
try {
MapOutputTracker.convertMapStatuses(
shuffleId, startPartition, endPartition, statuses)
@ -818,7 +819,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, mapIndex $mapIndex" +
s"partitions $startPartition-$endPartition")
val statuses = getStatuses(shuffleId)
val statuses = getStatuses(shuffleId, conf)
try {
MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition,
statuses, Some(mapIndex))
@ -836,7 +837,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
*
* (It would be nice to remove this restriction in the future.)
*/
private def getStatuses(shuffleId: Int): Array[MapStatus] = {
private def getStatuses(shuffleId: Int, conf: SparkConf): Array[MapStatus] = {
val statuses = mapStatuses.get(shuffleId).orNull
if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
@ -846,7 +847,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
if (fetchedStatuses == null) {
logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes, conf)
logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses)
}
@ -890,16 +891,20 @@ private[spark] object MapOutputTracker extends Logging {
// Serialize an array of map output locations into an efficient byte format so that we can send
// it to reduce tasks. We do this by compressing the serialized bytes using Zstd. They will
// generally be pretty compressible because many map outputs will be on the same hostname.
def serializeMapStatuses(statuses: Array[MapStatus], broadcastManager: BroadcastManager,
isLocal: Boolean, minBroadcastSize: Int): (Array[Byte], Broadcast[Array[Byte]]) = {
def serializeMapStatuses(
statuses: Array[MapStatus],
broadcastManager: BroadcastManager,
isLocal: Boolean,
minBroadcastSize: Int,
conf: SparkConf): (Array[Byte], Broadcast[Array[Byte]]) = {
// Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of the standard one
// This implementation doesn't reallocate the whole memory block but allocates
// additional buffers. This way no buffers need to be garbage collected and
// the contents don't have to be copied to the new buffer.
val out = new ApacheByteArrayOutputStream()
val compressedOut = new ApacheByteArrayOutputStream()
val objOut = new ObjectOutputStream(out)
out.write(DIRECT)
val codec = CompressionCodec.createCodec(conf, "zstd")
val objOut = new ObjectOutputStream(codec.compressedOutputStream(out))
Utils.tryWithSafeFinally {
// Since statuses can be modified in parallel, sync on it
statuses.synchronized {
@ -908,42 +913,21 @@ private[spark] object MapOutputTracker extends Logging {
} {
objOut.close()
}
val arr: Array[Byte] = {
val zos = new ZstdOutputStream(compressedOut)
Utils.tryWithSafeFinally {
compressedOut.write(DIRECT)
// `out.writeTo(zos)` will write the uncompressed data from `out` to `zos`
// without copying to avoid unnecessary allocation and copy of byte[].
out.writeTo(zos)
} {
zos.close()
}
compressedOut.toByteArray
}
val arr = out.toByteArray
if (arr.length >= minBroadcastSize) {
// Use broadcast instead.
// Important arr(0) is the tag == DIRECT, ignore that while deserializing !
val bcast = broadcastManager.newBroadcast(arr, isLocal)
// toByteArray creates copy, so we can reuse out
out.reset()
val oos = new ObjectOutputStream(out)
out.write(BROADCAST)
val oos = new ObjectOutputStream(codec.compressedOutputStream(out))
Utils.tryWithSafeFinally {
oos.writeObject(bcast)
} {
oos.close()
}
val outArr = {
compressedOut.reset()
val zos = new ZstdOutputStream(compressedOut)
Utils.tryWithSafeFinally {
compressedOut.write(BROADCAST)
out.writeTo(zos)
} {
zos.close()
}
compressedOut.toByteArray
}
val outArr = out.toByteArray
logInfo("Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length)
(outArr, bcast)
} else {
@ -952,11 +936,15 @@ private[spark] object MapOutputTracker extends Logging {
}
// Opposite of serializeMapStatuses.
def deserializeMapStatuses(bytes: Array[Byte]): Array[MapStatus] = {
def deserializeMapStatuses(bytes: Array[Byte], conf: SparkConf): Array[MapStatus] = {
assert (bytes.length > 0)
def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = {
val objIn = new ObjectInputStream(new ZstdInputStream(
val codec = CompressionCodec.createCodec(conf, "zstd")
// The ZStd codec is wrapped in a `BufferedInputStream` which avoids overhead excessive
// of JNI call while trying to decompress small amount of data for each element
// of `MapStatuses`
val objIn = new ObjectInputStream(codec.compressedInputStream(
new ByteArrayInputStream(arr, off, len)))
Utils.tryWithSafeFinally {
objIn.readObject()

View file

@ -67,19 +67,20 @@ object MapStatusesSerDeserBenchmark extends BenchmarkBase {
var serializedBroadcastSizes = 0
val (serializedMapStatus, serializedBroadcast) = MapOutputTracker.serializeMapStatuses(
shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize)
shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize,
sc.getConf)
serializedMapStatusSizes = serializedMapStatus.length
if (serializedBroadcast != null) {
serializedBroadcastSizes = serializedBroadcast.value.length
}
benchmark.addCase("Serialization") { _ =>
MapOutputTracker.serializeMapStatuses(
shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize)
MapOutputTracker.serializeMapStatuses(shuffleStatus.mapStatuses, tracker.broadcastManager,
tracker.isLocal, minBroadcastSize, sc.getConf)
}
benchmark.addCase("Deserialization") { _ =>
val result = MapOutputTracker.deserializeMapStatuses(serializedMapStatus)
val result = MapOutputTracker.deserializeMapStatuses(serializedMapStatus, sc.getConf)
assert(result.length == numMaps)
}