diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index ca1229a737..588f7d2815 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -436,6 +436,8 @@ private[spark] case class GetMapOutputMessage(shuffleId: Int, context: RpcCallContext) extends MapOutputTrackerMasterMessage private[spark] case class GetMapAndMergeOutputMessage(shuffleId: Int, context: RpcCallContext) extends MapOutputTrackerMasterMessage +private[spark] case class MapSizesByExecutorId( + iter: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])], enableBatchFetch: Boolean) /** RpcEndpoint class for MapOutputTrackerMaster */ private[spark] class MapOutputTrackerMasterEndpoint( @@ -512,12 +514,19 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging getMapSizesByExecutorId(shuffleId, 0, Int.MaxValue, reduceId, reduceId + 1) } + // For testing + def getPushBasedShuffleMapSizesByExecutorId(shuffleId: Int, reduceId: Int) + : MapSizesByExecutorId = { + getPushBasedShuffleMapSizesByExecutorId(shuffleId, 0, Int.MaxValue, reduceId, reduceId + 1) + } + /** * Called from executors to get the server URIs and output sizes for each shuffle block that * needs to be read from a given range of map output partitions (startPartition is included but * endPartition is excluded from the range) within a range of mappers (startMapIndex is included - * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be - * changed to the length of total map outputs. + * but endMapIndex is excluded) when push based shuffle is not enabled for the specific shuffle + * dependency. If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length + * of total map outputs. * * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * and the second item is a sequence of (shuffle block id, shuffle block size, map index) @@ -529,7 +538,34 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging startMapIndex: Int, endMapIndex: Int, startPartition: Int, - endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] + endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + val mapSizesByExecutorId = getPushBasedShuffleMapSizesByExecutorId( + shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) + assert(mapSizesByExecutorId.enableBatchFetch == true) + mapSizesByExecutorId.iter + } + + /** + * Called from executors to get the server URIs and output sizes for each shuffle block that + * needs to be read from a given range of map output partitions (startPartition is included but + * endPartition is excluded from the range) within a range of mappers (startMapIndex is included + * but endMapIndex is excluded) when push based shuffle is enabled for the specific shuffle + * dependency. If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length + * of total map outputs. + * + * @return A case class object which includes two attributes. The first attribute is a sequence + * of 2-item tuples, where the first item in the tuple is a BlockManagerId, and the + * second item is a sequence of (shuffle block id, shuffle block size, map index) tuples + * tuples describing the shuffle blocks that are stored at that block manager. Note that + * zero-sized blocks are excluded in the result. The second attribute is a boolean flag, + * indicating whether batch fetch can be enabled. + */ + def getPushBasedShuffleMapSizesByExecutorId( + shuffleId: Int, + startMapIndex: Int, + endMapIndex: Int, + startPartition: Int, + endPartition: Int): MapSizesByExecutorId /** * Called from executors upon fetch failure on an entire merged shuffle reduce partition. @@ -1060,12 +1096,12 @@ private[spark] class MapOutputTrackerMaster( } // This method is only called in local-mode. - def getMapSizesByExecutorId( + def getPushBasedShuffleMapSizesByExecutorId( shuffleId: Int, startMapIndex: Int, endMapIndex: Int, startPartition: Int, - endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + endPartition: Int): MapSizesByExecutorId = { logDebug(s"Fetching outputs for shuffle $shuffleId") shuffleStatuses.get(shuffleId) match { case Some(shuffleStatus) => @@ -1077,7 +1113,7 @@ private[spark] class MapOutputTrackerMaster( shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex) } case None => - Iterator.empty + MapSizesByExecutorId(Iterator.empty, true) } } @@ -1138,12 +1174,12 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr */ private val fetchingLock = new KeyLock[Int] - override def getMapSizesByExecutorId( + override def getPushBasedShuffleMapSizesByExecutorId( shuffleId: Int, startMapIndex: Int, endMapIndex: Int, startPartition: Int, - endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + endPartition: Int): MapSizesByExecutorId = { logDebug(s"Fetching outputs for shuffle $shuffleId") val (mapOutputStatuses, mergedOutputStatuses) = getStatuses(shuffleId, conf) try { @@ -1439,10 +1475,10 @@ private[spark] object MapOutputTracker extends Logging { mapStatuses: Array[MapStatus], startMapIndex : Int, endMapIndex: Int, - mergeStatuses: Option[Array[MergeStatus]] = None): - Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + mergeStatuses: Option[Array[MergeStatus]] = None): MapSizesByExecutorId = { assert (mapStatuses != null) val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]] + var enableBatchFetch = true // Only use MergeStatus for reduce tasks that fetch all map outputs. Since a merged shuffle // partition consists of blocks merged in random order, we are unable to serve map index // subrange requests. However, when a reduce task needs to fetch blocks from a subrange of @@ -1451,8 +1487,10 @@ private[spark] object MapOutputTracker extends Logging { // TODO: SPARK-35036: Instead of reading map blocks in case of AQE with Push based shuffle, // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end // TODO: map indexes - if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0 + if (mergeStatuses.exists(_.exists(_ != null)) && startMapIndex == 0 && endMapIndex == mapStatuses.length) { + enableBatchFetch = false + logDebug(s"Disable shuffle batch fetch as Push based shuffle is enabled for $shuffleId.") // We have MergeStatus and full range of mapIds are requested so return a merged block. val numMaps = mapStatuses.length mergeStatuses.get.zipWithIndex.slice(startPartition, endPartition).foreach { @@ -1497,7 +1535,7 @@ private[spark] object MapOutputTracker extends Logging { } } - splitsByAddress.mapValues(_.toSeq).iterator + MapSizesByExecutorId(splitsByAddress.mapValues(_.toSeq).iterator, enableBatchFetch) } /** diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index d3cc5ed107..e8c7f1f4d9 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -129,11 +129,21 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { - val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( - handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) + val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]] + val (blocksByAddress, canEnableBatchFetch) = + if (baseShuffleHandle.dependency.shuffleMergeEnabled) { + val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId( + handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) + (res.iter, res.enableBatchFetch) + } else { + val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( + handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) + (address, true) + } new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, - shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context)) + shouldBatchFetch = + canEnableBatchFetch && canUseBatchFetch(startPartition, endPartition, context)) } /** Get a writer for a given partition. Called on executors by map tasks. */ diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 4051118572..8bebecfe14 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -362,6 +362,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { test("SPARK-32921: get map sizes with merged shuffle") { conf.set(PUSH_BASED_SHUFFLE_ENABLED, true) conf.set(IS_TESTING, true) + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") val hostname = "localhost" val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf)) @@ -391,7 +392,9 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { bitmap, 3000L)) slaveTracker.updateEpoch(masterTracker.getEpoch) val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) - assert(slaveTracker.getMapSizesByExecutorId(10, 0).toSeq === + val mapSizesByExecutorId = slaveTracker.getPushBasedShuffleMapSizesByExecutorId(10, 0) + assert(mapSizesByExecutorId.enableBatchFetch === false) + assert(mapSizesByExecutorId.iter.toSeq === Seq((blockMgrId, ArrayBuffer((ShuffleMergedBlockId(10, 0, 0), 3000, -1), (ShuffleBlockId(10, 2, 0), size1000, 2))))) @@ -404,6 +407,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { test("SPARK-32921: get map statuses from merged shuffle") { conf.set(PUSH_BASED_SHUFFLE_ENABLED, true) conf.set(IS_TESTING, true) + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") val hostname = "localhost" val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf)) @@ -436,6 +440,8 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { bitmap, 4000L)) slaveTracker.updateEpoch(masterTracker.getEpoch) val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) + val mapSizesByExecutorId = slaveTracker.getPushBasedShuffleMapSizesByExecutorId(10, 0) + assert(mapSizesByExecutorId.enableBatchFetch === false) assert(slaveTracker.getMapSizesForMergeResult(10, 0).toSeq === Seq((blockMgrId, ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000, 0), (ShuffleBlockId(10, 1, 0), size1000, 1), (ShuffleBlockId(10, 2, 0), size1000, 2), @@ -449,6 +455,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { test("SPARK-32921: get map statuses for merged shuffle block chunks") { conf.set(PUSH_BASED_SHUFFLE_ENABLED, true) conf.set(IS_TESTING, true) + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") val hostname = "localhost" val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf)) @@ -736,4 +743,115 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { tracker.stop() } } + + test("SPARK-36892: Batch fetch should be enabled in some scenarios with push based shuffle") { + conf.set(PUSH_BASED_SHUFFLE_ENABLED, true) + conf.set(IS_TESTING, true) + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") + + val hostname = "localhost" + val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf)) + + val masterTracker = newTrackerMaster() + masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, + new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf)) + + val slaveRpcEnv = createRpcEnv("spark-slave", hostname, 0, new SecurityManager(conf)) + val slaveTracker = new MapOutputTrackerWorker(conf) + slaveTracker.trackerEndpoint = + slaveRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) + + masterTracker.registerShuffle(10, 4, 1) + slaveTracker.updateEpoch(masterTracker.getEpoch) + + val blockMgrId = BlockManagerId("a", "hostA", 1000) + masterTracker.registerMapOutput(10, 0, MapStatus(blockMgrId, Array(1000L), 0)) + masterTracker.registerMapOutput(10, 1, MapStatus(blockMgrId, Array(1000L), 1)) + masterTracker.registerMapOutput(10, 2, MapStatus(blockMgrId, Array(1000L), 2)) + masterTracker.registerMapOutput(10, 3, MapStatus(blockMgrId, Array(1000L), 3)) + + slaveTracker.updateEpoch(masterTracker.getEpoch) + val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) + val mapSizesByExecutorId = slaveTracker.getPushBasedShuffleMapSizesByExecutorId(10, 0) + // Batch fetch should be enabled when there are no merged shuffle files + assert(mapSizesByExecutorId.enableBatchFetch === true) + assert(mapSizesByExecutorId.iter.toSeq === + Seq((blockMgrId, ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000, 0), + (ShuffleBlockId(10, 1, 0), size1000, 1), (ShuffleBlockId(10, 2, 0), size1000, 2), + (ShuffleBlockId(10, 3, 0), size1000, 3))))) + + masterTracker.registerShuffle(11, 4, 1) + slaveTracker.updateEpoch(masterTracker.getEpoch) + + val bitmap = new RoaringBitmap() + bitmap.add(0) + bitmap.add(1) + bitmap.add(3) + + masterTracker.registerMergeResult(11, 0, MergeStatus(blockMgrId, 0, + bitmap, 3000L)) + masterTracker.registerMapOutput(11, 0, MapStatus(blockMgrId, Array(1000L), 0)) + masterTracker.registerMapOutput(11, 1, MapStatus(blockMgrId, Array(1000L), 1)) + masterTracker.registerMapOutput(11, 2, MapStatus(blockMgrId, Array(1000L), 2)) + masterTracker.registerMapOutput(11, 3, MapStatus(blockMgrId, Array(1000L), 3)) + + slaveTracker.updateEpoch(masterTracker.getEpoch) + val mapSizesByExecutorId2 = slaveTracker.getPushBasedShuffleMapSizesByExecutorId(11, 0, 2, 0, 1) + // Batch fetch should be enabled when it only fetches subsets of mapper outputs + assert(mapSizesByExecutorId2.enableBatchFetch === true) + assert(mapSizesByExecutorId2.iter.toSeq === + Seq((blockMgrId, ArrayBuffer((ShuffleBlockId(11, 0, 0), size1000, 0), + (ShuffleBlockId(11, 1, 0), size1000, 1))))) + + masterTracker.unregisterShuffle(10) + masterTracker.unregisterShuffle(11) + masterTracker.stop() + slaveTracker.stop() + rpcEnv.shutdown() + slaveRpcEnv.shutdown() + } + + test("SPARK-36892: Batch fetch should be disabled in some scenarios with push based shuffle") { + conf.set(PUSH_BASED_SHUFFLE_ENABLED, true) + conf.set(IS_TESTING, true) + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") + + val hostname = "localhost" + val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf)) + + val masterTracker = newTrackerMaster() + masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, + new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf)) + + val slaveRpcEnv = createRpcEnv("spark-slave", hostname, 0, new SecurityManager(conf)) + val slaveTracker = new MapOutputTrackerWorker(conf) + slaveTracker.trackerEndpoint = + slaveRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) + masterTracker.registerShuffle(10, 4, 2) + assert(masterTracker.containsShuffle(10)) + + val blockMgrId = BlockManagerId("a", "hostA", 1000) + masterTracker.registerMapOutput(10, 0, MapStatus(blockMgrId, Array(1000L, 1000L), 0)) + masterTracker.registerMapOutput(10, 1, MapStatus(blockMgrId, Array(1000L, 1000L), 1)) + masterTracker.registerMapOutput(10, 2, MapStatus(blockMgrId, Array(1000L, 1000L), 2)) + masterTracker.registerMapOutput(10, 3, MapStatus(blockMgrId, Array(1000L, 1000L), 3)) + slaveTracker.updateEpoch(masterTracker.getEpoch) + + val bitmap = new RoaringBitmap() + bitmap.add(0) + bitmap.add(1) + masterTracker.registerMergeResult(10, 0, MergeStatus(blockMgrId, 0, + bitmap, 2000L)) + masterTracker.registerMergeResult(10, 1, MergeStatus(blockMgrId, 0, + bitmap, 2000L)) + slaveTracker.updateEpoch(masterTracker.getEpoch) + // Query for all mappers output for multiple reducers, since there are merged shuffles, + // batch fetch should be disabled. + val mapSizesByExecutorId = + slaveTracker.getPushBasedShuffleMapSizesByExecutorId(10, 0, Int.MaxValue, 0, 2) + assert(mapSizesByExecutorId.enableBatchFetch === false) + masterTracker.unregisterShuffle(10) + masterTracker.stop() + rpcEnv.shutdown() + } } diff --git a/docs/configuration.md b/docs/configuration.md index 48555d847a..625cb2390f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -3155,7 +3155,7 @@ See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this fe # Push-based shuffle overview -Push-based shuffle helps improve the reliability and performance of spark shuffle. It takes a best-effort approach to push the shuffle blocks generated by the map tasks to remote external shuffle services to be merged per shuffle partition. Reduce tasks fetch a combination of merged shuffle partitions and original shuffle blocks as their input data, resulting in converting small random disk reads by external shuffle services into large sequential reads. Possibility of better data locality for reduce tasks additionally helps minimize network IO. +Push-based shuffle helps improve the reliability and performance of spark shuffle. It takes a best-effort approach to push the shuffle blocks generated by the map tasks to remote external shuffle services to be merged per shuffle partition. Reduce tasks fetch a combination of merged shuffle partitions and original shuffle blocks as their input data, resulting in converting small random disk reads by external shuffle services into large sequential reads. Possibility of better data locality for reduce tasks additionally helps minimize network IO. Push-based shuffle takes priority over batch fetch for some scenarios, like partition coalesce when merged output is available.

Push-based shuffle improves performance for long running jobs/queries which involves large disk I/O during shuffle. Currently it is not well suited for jobs/queries which runs quickly dealing with lesser amount of shuffle data. This will be further improved in the future releases.