[SPARK-32919][FOLLOW-UP] Filter out driver in the merger locations and fix the return type of RemoveShufflePushMergerLocations

### What changes were proposed in this pull request?

SPARK-32919 added support for fetching shuffle push merger locations with push-based shuffle. Filter out driver host in the shuffle push merger locations as driver won't participate in the shuffle merge also fix ClassCastException in the RemoveShufflePushMergerLocations.
### Why are the changes needed?

No

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit tests.

Closes #33425 from venkata91/SPARK-32919-follow-up.

Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
This commit is contained in:
Venkata krishnan Sowrirajan 2021-08-01 13:11:34 -05:00 committed by Mridul Muralidharan
parent 22c49226f7
commit 2a18f82940
3 changed files with 10 additions and 3 deletions

View file

@ -142,7 +142,7 @@ class BlockManagerMaster(
* @param host
*/
def removeShufflePushMergerLocation(host: String): Unit = {
driverEndpoint.askSync[Seq[BlockManagerId]](RemoveShufflePushMergerLocation(host))
driverEndpoint.askSync[Unit](RemoveShufflePushMergerLocation(host))
}
def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {

View file

@ -715,7 +715,8 @@ class BlockManagerMasterEndpoint(
private def getShufflePushMergerLocations(
numMergersNeeded: Int,
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
val blockManagerHosts = blockManagerIdByExecutor.values.map(_.host).toSet
val blockManagerHosts = blockManagerIdByExecutor
.filterNot(_._2.isDriver).values.map(_.host).toSet
val filteredBlockManagerHosts = blockManagerHosts.filterNot(hostsToFilter.contains(_))
val filteredMergersWithExecutors = filteredBlockManagerHosts.map(
BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, _, externalShuffleServicePort))

View file

@ -2084,7 +2084,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
makeBlockManager(100, "execE",
transferService = Some(new MockBlockTransferService(10, "hostA")))
assert(master.getShufflePushMergerLocations(5, Set.empty).size == 4)
assert(master.getExecutorEndpointRef(SparkContext.DRIVER_IDENTIFIER).isEmpty)
makeBlockManager(100, SparkContext.DRIVER_IDENTIFIER,
transferService = Some(new MockBlockTransferService(10, "host-driver")))
assert(master.getExecutorEndpointRef(SparkContext.DRIVER_IDENTIFIER).isDefined)
master.removeExecutor("execA")
master.removeExecutor("execE")
@ -2093,6 +2096,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
Seq("hostC", "hostB", "hostD").sorted)
assert(master.getShufflePushMergerLocations(4, Set.empty).map(_.host).sorted ===
Seq("hostB", "hostA", "hostC", "hostD").sorted)
master.removeShufflePushMergerLocation("hostA")
assert(master.getShufflePushMergerLocations(4, Set.empty).map(_.host).sorted ===
Seq("hostB", "hostC", "hostD").sorted)
}
test("SPARK-33387 Support ordered shuffle block migration") {