[SPARK-32918][SHUFFLE] RPC implementation to support control plane coordination for push-based shuffle
### What changes were proposed in this pull request? This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle. Summary of changes: This PR introduces a new RPC to be called within Driver. When the expected shuffle push wait time reaches, Driver will call this RPC to facilitate coordination of shuffle map/reduce stages and notify external shuffle services to finalize shuffle block merge for a given shuffle. Shuffle services also respond back the metadata about a merged shuffle partition back to the caller. ### Why are the changes needed? Refer to the SPIP in SPARK-30602. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This code snippets won't be called by any existing code and will be tested after the coordinated driver changes gets merged in SPARK-32920. Lead-authored-by: Min Shen mshenlinkedin.com Closes #30163 from zhouyejoe/SPARK-32918. Lead-authored-by: Ye Zhou <yezhou@linkedin.com> Co-authored-by: Min Shen <mshen@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
This commit is contained in:
parent
f83fcb1254
commit
1bd897cbc4
|
@ -147,6 +147,8 @@ public abstract class BlockStoreClient implements Closeable {
|
||||||
* @param blockIds block ids to be pushed
|
* @param blockIds block ids to be pushed
|
||||||
* @param buffers buffers to be pushed
|
* @param buffers buffers to be pushed
|
||||||
* @param listener the listener to receive block push status.
|
* @param listener the listener to receive block push status.
|
||||||
|
*
|
||||||
|
* @since 3.1.0
|
||||||
*/
|
*/
|
||||||
public void pushBlocks(
|
public void pushBlocks(
|
||||||
String host,
|
String host,
|
||||||
|
@ -156,4 +158,24 @@ public abstract class BlockStoreClient implements Closeable {
|
||||||
BlockFetchingListener listener) {
|
BlockFetchingListener listener) {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoked by Spark driver to notify external shuffle services to finalize the shuffle merge
|
||||||
|
* for a given shuffle. This allows the driver to start the shuffle reducer stage after properly
|
||||||
|
* finishing the shuffle merge process associated with the shuffle mapper stage.
|
||||||
|
*
|
||||||
|
* @param host host of shuffle server
|
||||||
|
* @param port port of shuffle server.
|
||||||
|
* @param shuffleId shuffle ID of the shuffle to be finalized
|
||||||
|
* @param listener the listener to receive MergeStatuses
|
||||||
|
*
|
||||||
|
* @since 3.1.0
|
||||||
|
*/
|
||||||
|
public void finalizeShuffleMerge(
|
||||||
|
String host,
|
||||||
|
int port,
|
||||||
|
int shuffleId,
|
||||||
|
MergeFinalizerListener listener) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -158,6 +158,35 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void finalizeShuffleMerge(
|
||||||
|
String host,
|
||||||
|
int port,
|
||||||
|
int shuffleId,
|
||||||
|
MergeFinalizerListener listener) {
|
||||||
|
checkInit();
|
||||||
|
try {
|
||||||
|
TransportClient client = clientFactory.createClient(host, port);
|
||||||
|
ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer();
|
||||||
|
client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(ByteBuffer response) {
|
||||||
|
listener.onShuffleMergeSuccess(
|
||||||
|
(MergeStatuses) BlockTransferMessage.Decoder.fromByteBuffer(response));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable e) {
|
||||||
|
listener.onShuffleMergeFailure(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Exception while sending finalizeShuffleMerge request to {}:{}",
|
||||||
|
host, port, e);
|
||||||
|
listener.onShuffleMergeFailure(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MetricSet shuffleMetrics() {
|
public MetricSet shuffleMetrics() {
|
||||||
checkInit();
|
checkInit();
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.spark.network.shuffle;
|
||||||
|
|
||||||
|
import java.util.EventListener;
|
||||||
|
|
||||||
|
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* :: DeveloperApi ::
|
||||||
|
*
|
||||||
|
* Listener providing a callback function to invoke when driver receives the response for the
|
||||||
|
* finalize shuffle merge request sent to remote shuffle service.
|
||||||
|
*
|
||||||
|
* @since 3.1.0
|
||||||
|
*/
|
||||||
|
public interface MergeFinalizerListener extends EventListener {
|
||||||
|
/**
|
||||||
|
* Called once upon successful response on finalize shuffle merge on a remote shuffle service.
|
||||||
|
* The returned {@link MergeStatuses} is passed to the listener for further processing
|
||||||
|
*/
|
||||||
|
void onShuffleMergeSuccess(MergeStatuses statuses);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called once upon failure response on finalize shuffle merge on a remote shuffle service.
|
||||||
|
*/
|
||||||
|
void onShuffleMergeFailure(Throwable e);
|
||||||
|
}
|
Loading…
Reference in a new issue