From 1bd897cbc4fe30eb8b7740c7232aae87081e8e33 Mon Sep 17 00:00:00 2001 From: Ye Zhou Date: Mon, 23 Nov 2020 15:16:20 -0600 Subject: [PATCH] [SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle ### What changes were proposed in this pull request? This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle. Summary of changes: This PR introduces a new RPC to be called within Driver. When the expected shuffle push wait time reaches, Driver will call this RPC to facilitate coordination of shuffle map/reduce stages and notify external shuffle services to finalize shuffle block merge for a given shuffle. Shuffle services also respond back the metadata about a merged shuffle partition back to the caller. ### Why are the changes needed? Refer to the SPIP in SPARK-30602. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This code snippets won't be called by any existing code and will be tested after the coordinated driver changes gets merged in SPARK-32920. Lead-authored-by: Min Shen mshenlinkedin.com Closes #30163 from zhouyejoe/SPARK-32918. Lead-authored-by: Ye Zhou Co-authored-by: Min Shen Signed-off-by: Mridul Muralidharan gmail.com> --- .../network/shuffle/BlockStoreClient.java | 22 ++++++++++ .../shuffle/ExternalBlockStoreClient.java | 29 +++++++++++++ .../shuffle/MergeFinalizerListener.java | 43 +++++++++++++++++++ 3 files changed, 94 insertions(+) create mode 100644 common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 37befcd4b6..a6bdc13e93 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -147,6 +147,8 @@ public abstract class BlockStoreClient implements Closeable { * @param blockIds block ids to be pushed * @param buffers buffers to be pushed * @param listener the listener to receive block push status. + * + * @since 3.1.0 */ public void pushBlocks( String host, @@ -156,4 +158,24 @@ public abstract class BlockStoreClient implements Closeable { BlockFetchingListener listener) { throw new UnsupportedOperationException(); } + + /** + * Invoked by Spark driver to notify external shuffle services to finalize the shuffle merge + * for a given shuffle. This allows the driver to start the shuffle reducer stage after properly + * finishing the shuffle merge process associated with the shuffle mapper stage. + * + * @param host host of shuffle server + * @param port port of shuffle server. + * @param shuffleId shuffle ID of the shuffle to be finalized + * @param listener the listener to receive MergeStatuses + * + * @since 3.1.0 + */ + public void finalizeShuffleMerge( + String host, + int port, + int shuffleId, + MergeFinalizerListener listener) { + throw new UnsupportedOperationException(); + } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index eca35ed290..56c06e640a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -158,6 +158,35 @@ public class ExternalBlockStoreClient extends BlockStoreClient { } } + @Override + public void finalizeShuffleMerge( + String host, + int port, + int shuffleId, + MergeFinalizerListener listener) { + checkInit(); + try { + TransportClient client = clientFactory.createClient(host, port); + ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer(); + client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() { + @Override + public void onSuccess(ByteBuffer response) { + listener.onShuffleMergeSuccess( + (MergeStatuses) BlockTransferMessage.Decoder.fromByteBuffer(response)); + } + + @Override + public void onFailure(Throwable e) { + listener.onShuffleMergeFailure(e); + } + }); + } catch (Exception e) { + logger.error("Exception while sending finalizeShuffleMerge request to {}:{}", + host, port, e); + listener.onShuffleMergeFailure(e); + } + } + @Override public MetricSet shuffleMetrics() { checkInit(); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java new file mode 100644 index 0000000000..08e13eea9f --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.util.EventListener; + +import org.apache.spark.network.shuffle.protocol.MergeStatuses; + +/** + * :: DeveloperApi :: + * + * Listener providing a callback function to invoke when driver receives the response for the + * finalize shuffle merge request sent to remote shuffle service. + * + * @since 3.1.0 + */ +public interface MergeFinalizerListener extends EventListener { + /** + * Called once upon successful response on finalize shuffle merge on a remote shuffle service. + * The returned {@link MergeStatuses} is passed to the listener for further processing + */ + void onShuffleMergeSuccess(MergeStatuses statuses); + + /** + * Called once upon failure response on finalize shuffle merge on a remote shuffle service. + */ + void onShuffleMergeFailure(Throwable e); +}