From c4aa54ed4e8cf7942335bfafdeacf57b5d148f2a Mon Sep 17 00:00:00 2001 From: Min Shen Date: Mon, 26 Jul 2021 17:39:19 -0500 Subject: [PATCH] [SPARK-36266][SHUFFLE] Rename classes in shuffle RPC used for block push operations ### What changes were proposed in this pull request? This is a follow-up to #29855 according to the [comments](https://github.com/apache/spark/pull/29855/files#r505536514) In this PR, the following changes are made: 1. A new `BlockPushingListener` interface is created specifically for block push. The existing `BlockFetchingListener` interface is left as is, since it might be used by external shuffle solutions. These 2 interfaces are unified under `BlockTransferListener` to enable code reuse. 2. `RetryingBlockFetcher`, `BlockFetchStarter`, and `RetryingBlockFetchListener` are renamed to `RetryingBlockTransferor`, `BlockTransferStarter`, and `RetryingBlockTransferListener` respectively. This makes their names more generic to be reused across both block fetch and push. 3. Comments in `OneForOneBlockPusher` are further clarified to better explain how we handle retries for block push. ### Why are the changes needed? To make code cleaner without sacrificing backward compatibility. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests. Closes #33340 from Victsm/SPARK-32915-followup. Lead-authored-by: Min Shen Co-authored-by: Min Shen Signed-off-by: Mridul Muralidharan gmail.com> --- .../shuffle/BlockFetchingListener.java | 19 +- .../network/shuffle/BlockPushingListener.java | 54 ++++++ .../network/shuffle/BlockStoreClient.java | 2 +- .../shuffle/BlockTransferListener.java | 44 +++++ .../spark/network/shuffle/ErrorHandler.java | 4 +- .../shuffle/ExternalBlockStoreClient.java | 30 +-- .../network/shuffle/OneForOneBlockPusher.java | 41 ++-- ...cher.java => RetryingBlockTransferor.java} | 176 +++++++++++------- .../shuffle/OneForOneBlockPusherSuite.java | 36 ++-- ...java => RetryingBlockTransferorSuite.java} | 44 +++-- .../netty/NettyBlockTransferService.scala | 14 +- .../spark/shuffle/ShuffleBlockPusher.scala | 8 +- .../NettyBlockTransferServiceSuite.scala | 2 +- .../shuffle/ShuffleBlockPusherSuite.scala | 38 ++-- 14 files changed, 347 insertions(+), 165 deletions(-) create mode 100644 common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockPushingListener.java create mode 100644 common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockTransferListener.java rename common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/{RetryingBlockFetcher.java => RetryingBlockTransferor.java} (51%) rename common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/{RetryingBlockFetcherSuite.java => RetryingBlockTransferorSuite.java} (85%) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java index 138fd5389c..0be913e4d8 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java @@ -17,11 +17,9 @@ package org.apache.spark.network.shuffle; -import java.util.EventListener; - import org.apache.spark.network.buffer.ManagedBuffer; -public interface BlockFetchingListener extends EventListener { +public interface BlockFetchingListener extends BlockTransferListener { /** * Called once per successfully fetched block. After this call returns, data will be released * automatically. If the data will be passed to another thread, the receiver should retain() @@ -33,4 +31,19 @@ public interface BlockFetchingListener extends EventListener { * Called at least once per block upon failures. */ void onBlockFetchFailure(String blockId, Throwable exception); + + @Override + default void onBlockTransferSuccess(String blockId, ManagedBuffer data) { + onBlockFetchSuccess(blockId, data); + } + + @Override + default void onBlockTransferFailure(String blockId, Throwable exception) { + onBlockFetchFailure(blockId, exception); + } + + @Override + default String getTransferType() { + return "fetch"; + } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockPushingListener.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockPushingListener.java new file mode 100644 index 0000000000..1421b22cd4 --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockPushingListener.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import org.apache.spark.network.buffer.ManagedBuffer; + +/** + * Callback to handle block push success and failure. This interface and + * {@link BlockFetchingListener} are unified under {@link BlockTransferListener} to allow + * code reuse for handling block push and fetch retry. + */ +public interface BlockPushingListener extends BlockTransferListener { + /** + * Called once per successfully pushed block. After this call returns, data will be released + * automatically. If the data will be passed to another thread, the receiver should retain() + * and release() the buffer on their own, or copy the data to a new buffer. + */ + void onBlockPushSuccess(String blockId, ManagedBuffer data); + + /** + * Called at least once per block upon failures. + */ + void onBlockPushFailure(String blockId, Throwable exception); + + @Override + default void onBlockTransferSuccess(String blockId, ManagedBuffer data) { + onBlockPushSuccess(blockId, data); + } + + @Override + default void onBlockTransferFailure(String blockId, Throwable exception) { + onBlockPushFailure(blockId, exception); + } + + @Override + default String getTransferType() { + return "push"; + } +} diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 238d26ee50..b6852130c9 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -155,7 +155,7 @@ public abstract class BlockStoreClient implements Closeable { int port, String[] blockIds, ManagedBuffer[] buffers, - BlockFetchingListener listener) { + BlockPushingListener listener) { throw new UnsupportedOperationException(); } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockTransferListener.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockTransferListener.java new file mode 100644 index 0000000000..e019dabcba --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockTransferListener.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.util.EventListener; + +import org.apache.spark.network.buffer.ManagedBuffer; + +/** + * This interface unifies both {@link BlockFetchingListener} and {@link BlockPushingListener} + * under a single interface to allow code reuse, while also keeping the existing public interface + * to facilitate backward compatibility. + */ +public interface BlockTransferListener extends EventListener { + /** + * Called once per successfully transferred block. + */ + void onBlockTransferSuccess(String blockId, ManagedBuffer data); + + /** + * Called at least once per block transfer failures. + */ + void onBlockTransferFailure(String blockId, Throwable exception); + + /** + * Return a string indicating the type of the listener such as fetch, push, or something else + */ + String getTransferType(); +} diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java index 2e15671a25..a75887525e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java @@ -25,9 +25,9 @@ import com.google.common.base.Throwables; import org.apache.spark.annotation.Evolving; /** - * Plugs into {@link RetryingBlockFetcher} to further control when an exception should be retried + * Plugs into {@link RetryingBlockTransferor} to further control when an exception should be retried * and logged. - * Note: {@link RetryingBlockFetcher} will delegate the exception to this handler only when + * Note: {@link RetryingBlockTransferor} will delegate the exception to this handler only when * - remaining retries < max retries * - exception is an IOException * diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index 63bf787195..f88915b504 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -95,13 +95,15 @@ public class ExternalBlockStoreClient extends BlockStoreClient { logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId); try { int maxRetries = conf.maxIORetries(); - RetryingBlockFetcher.BlockFetchStarter blockFetchStarter = + RetryingBlockTransferor.BlockTransferStarter blockFetchStarter = (inputBlockId, inputListener) -> { // Unless this client is closed. if (clientFactory != null) { + assert inputListener instanceof BlockFetchingListener : + "Expecting a BlockFetchingListener, but got " + inputListener.getClass(); TransportClient client = clientFactory.createClient(host, port, maxRetries > 0); - new OneForOneBlockFetcher(client, appId, execId, - inputBlockId, inputListener, conf, downloadFileManager).start(); + new OneForOneBlockFetcher(client, appId, execId, inputBlockId, + (BlockFetchingListener) inputListener, conf, downloadFileManager).start(); } else { logger.info("This clientFactory was closed. Skipping further block fetch retries."); } @@ -110,7 +112,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient { if (maxRetries > 0) { // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's // a bug in this code. We should remove the if statement once we're sure of the stability. - new RetryingBlockFetcher(conf, blockFetchStarter, blockIds, listener).start(); + new RetryingBlockTransferor(conf, blockFetchStarter, blockIds, listener).start(); } else { blockFetchStarter.createAndStart(blockIds, listener); } @@ -128,7 +130,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient { int port, String[] blockIds, ManagedBuffer[] buffers, - BlockFetchingListener listener) { + BlockPushingListener listener) { checkInit(); assert blockIds.length == buffers.length : "Number of block ids and buffers do not match."; @@ -138,15 +140,21 @@ public class ExternalBlockStoreClient extends BlockStoreClient { } logger.debug("Push {} shuffle blocks to {}:{}", blockIds.length, host, port); try { - RetryingBlockFetcher.BlockFetchStarter blockPushStarter = + RetryingBlockTransferor.BlockTransferStarter blockPushStarter = (inputBlockId, inputListener) -> { - TransportClient client = clientFactory.createClient(host, port); - new OneForOneBlockPusher(client, appId, conf.appAttemptId(), inputBlockId, - inputListener, buffersWithId).start(); + if (clientFactory != null) { + assert inputListener instanceof BlockPushingListener : + "Expecting a BlockPushingListener, but got " + inputListener.getClass(); + TransportClient client = clientFactory.createClient(host, port); + new OneForOneBlockPusher(client, appId, conf.appAttemptId(), inputBlockId, + (BlockPushingListener) inputListener, buffersWithId).start(); + } else { + logger.info("This clientFactory was closed. Skipping further block push retries."); + } }; int maxRetries = conf.maxIORetries(); if (maxRetries > 0) { - new RetryingBlockFetcher( + new RetryingBlockTransferor( conf, blockPushStarter, blockIds, listener, PUSH_ERROR_HANDLER).start(); } else { blockPushStarter.createAndStart(blockIds, listener); @@ -154,7 +162,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient { } catch (Exception e) { logger.error("Exception while beginning pushBlocks", e); for (String blockId : blockIds) { - listener.onBlockFetchFailure(blockId, e); + listener.onBlockPushFailure(blockId, e); } } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java index b8b32e2755..0e1c59f352 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java @@ -47,7 +47,7 @@ public class OneForOneBlockPusher { private final String appId; private final int appAttemptId; private final String[] blockIds; - private final BlockFetchingListener listener; + private final BlockPushingListener listener; private final Map buffers; public OneForOneBlockPusher( @@ -55,7 +55,7 @@ public class OneForOneBlockPusher { String appId, int appAttemptId, String[] blockIds, - BlockFetchingListener listener, + BlockPushingListener listener, Map buffers) { this.client = client; this.appId = appId; @@ -78,21 +78,34 @@ public class OneForOneBlockPusher { @Override public void onSuccess(ByteBuffer response) { // On receipt of a successful block push - listener.onBlockFetchSuccess(blockId, new NioManagedBuffer(ByteBuffer.allocate(0))); + listener.onBlockPushSuccess(blockId, new NioManagedBuffer(ByteBuffer.allocate(0))); } @Override public void onFailure(Throwable e) { - // Since block push is best effort, i.e., if we encountered a block push failure that's not - // retriable or exceeding the max retires, we should not fail all remaining block pushes. - // The best effort nature makes block push tolerable of a partial completion. Thus, we only - // fail the block that's actually failed. Not that, on the RetryingBlockFetcher side, once - // retry is initiated, it would still invalidate the previous active retry listener, and - // retry all outstanding blocks. We are preventing forwarding unnecessary block push failures - // to the parent listener of the retry listener. The only exceptions would be if the block - // push failure is due to block arriving on the server side after merge finalization, or the - // client fails to establish connection to the server side. In both cases, we would fail all - // remaining blocks. + // Since block push is best effort, i.e., if we encounter a block push failure that's still + // retriable according to ErrorHandler (not a connection exception and the block is not too + // late), we should not fail all remaining block pushes even though + // RetryingBlockTransferor might consider this failure not retriable (exceeding max retry + // count etc). The best effort nature makes block push tolerable of a partial completion. + // Thus, we only fail the block that's actually failed in this case. Note that, on the + // RetryingBlockTransferor side, if retry is initiated, it would still invalidate the + // previous active retry listener, and retry pushing all outstanding blocks. However, since + // the blocks to be pushed are preloaded into memory and the first attempt of pushing these + // blocks might have already succeeded, retry pushing all the outstanding blocks should be + // very cheap (on the client side, the block data is in memory; on the server side, the block + // will be recognized as a duplicate which triggers noop handling). Here, by failing only the + // one block that's actually failed, we are essentially preventing forwarding unnecessary + // block push failures to the parent listener of the retry listener. + // + // Take the following as an example. For the common exception during block push handling, + // i.e. block collision, it is considered as retriable by ErrorHandler but not retriable + // by RetryingBlockTransferor. When we encounter a failure of this type, we only fail the + // one block encountering this issue not the remaining blocks in the same batch. On the + // RetryingBlockTransferor side, since this exception is considered as not retriable, it + // would immediately invoke parent listener's onBlockTransferFailure. However, the remaining + // blocks in the same batch would remain current and active and they won't be impacted by + // this exception. if (PUSH_ERROR_HANDLER.shouldRetryError(e)) { String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index + 1); failRemainingBlocks(targetBlockId, e); @@ -106,7 +119,7 @@ public class OneForOneBlockPusher { private void failRemainingBlocks(String[] failedBlockIds, Throwable e) { for (String blockId : failedBlockIds) { try { - listener.onBlockFetchFailure(blockId, e); + listener.onBlockPushFailure(blockId, e); } catch (Exception e2) { logger.error("Error in block push failure callback", e2); } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java similarity index 51% rename from common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java rename to common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java index 43bde1610e..512e4a52c8 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java @@ -34,44 +34,44 @@ import org.apache.spark.network.util.NettyUtils; import org.apache.spark.network.util.TransportConf; /** - * Wraps another BlockFetcher with the ability to automatically retry fetches which fail due to - * IOExceptions, which we hope are due to transient network conditions. + * Wraps another BlockFetcher or BlockPusher with the ability to automatically retry block + * transfers which fail due to IOExceptions, which we hope are due to transient network conditions. * - * This fetcher provides stronger guarantees regarding the parent BlockFetchingListener. In + * This transferor provides stronger guarantees regarding the parent BlockTransferListener. In * particular, the listener will be invoked exactly once per blockId, with a success or failure. */ -public class RetryingBlockFetcher { +public class RetryingBlockTransferor { /** - * Used to initiate the first fetch for all blocks, and subsequently for retrying the fetch on any - * remaining blocks. + * Used to initiate the first transfer for all blocks, and subsequently for retrying the + * transfer on any remaining blocks. */ - public interface BlockFetchStarter { + public interface BlockTransferStarter { /** - * Creates a new BlockFetcher to fetch the given block ids which may do some synchronous - * bootstrapping followed by fully asynchronous block fetching. - * The BlockFetcher must eventually invoke the Listener on every input blockId, or else this - * method must throw an exception. + * Creates a new BlockFetcher or BlockPusher to transfer the given block ids which may do + * some synchronous bootstrapping followed by fully asynchronous block transferring. + * The BlockFetcher or BlockPusher must eventually invoke the Listener on every input blockId, + * or else this method must throw an exception. * * This method should always attempt to get a new TransportClient from the * {@link org.apache.spark.network.client.TransportClientFactory} in order to fix connection * issues. */ - void createAndStart(String[] blockIds, BlockFetchingListener listener) + void createAndStart(String[] blockIds, BlockTransferListener listener) throws IOException, InterruptedException; } /** Shared executor service used for waiting and retrying. */ private static final ExecutorService executorService = Executors.newCachedThreadPool( - NettyUtils.createThreadFactory("Block Fetch Retry")); + NettyUtils.createThreadFactory("Block Transfer Retry")); - private static final Logger logger = LoggerFactory.getLogger(RetryingBlockFetcher.class); + private static final Logger logger = LoggerFactory.getLogger(RetryingBlockTransferor.class); - /** Used to initiate new Block Fetches on our remaining blocks. */ - private final BlockFetchStarter fetchStarter; + /** Used to initiate new Block transfer on our remaining blocks. */ + private final BlockTransferStarter transferStarter; - /** Parent listener which we delegate all successful or permanently failed block fetches to. */ - private final BlockFetchingListener listener; + /** Parent listener which we delegate all successful or permanently failed block transfers to. */ + private final BlockTransferListener listener; /** Max number of times we are allowed to retry. */ private final int maxRetries; @@ -86,80 +86,82 @@ public class RetryingBlockFetcher { private int retryCount = 0; /** - * Set of all block ids which have not been fetched successfully or with a non-IO Exception. + * Set of all block ids which have not been transferred successfully or with a non-IO Exception. * A retry involves requesting every outstanding block. Note that since this is a LinkedHashSet, * input ordering is preserved, so we always request blocks in the same order the user provided. */ private final LinkedHashSet outstandingBlocksIds; /** - * The BlockFetchingListener that is active with our current BlockFetcher. + * The BlockTransferListener that is active with our current BlockFetcher. * When we start a retry, we immediately replace this with a new Listener, which causes all any * old Listeners to ignore all further responses. */ - private RetryingBlockFetchListener currentListener; + private RetryingBlockTransferListener currentListener; private final ErrorHandler errorHandler; - public RetryingBlockFetcher( + public RetryingBlockTransferor( TransportConf conf, - RetryingBlockFetcher.BlockFetchStarter fetchStarter, + BlockTransferStarter transferStarter, String[] blockIds, - BlockFetchingListener listener, + BlockTransferListener listener, ErrorHandler errorHandler) { - this.fetchStarter = fetchStarter; + this.transferStarter = transferStarter; this.listener = listener; this.maxRetries = conf.maxIORetries(); this.retryWaitTime = conf.ioRetryWaitTimeMs(); this.outstandingBlocksIds = Sets.newLinkedHashSet(); Collections.addAll(outstandingBlocksIds, blockIds); - this.currentListener = new RetryingBlockFetchListener(); + this.currentListener = new RetryingBlockTransferListener(); this.errorHandler = errorHandler; } - public RetryingBlockFetcher( + public RetryingBlockTransferor( TransportConf conf, - BlockFetchStarter fetchStarter, + BlockTransferStarter transferStarter, String[] blockIds, BlockFetchingListener listener) { - this(conf, fetchStarter, blockIds, listener, ErrorHandler.NOOP_ERROR_HANDLER); + this(conf, transferStarter, blockIds, listener, ErrorHandler.NOOP_ERROR_HANDLER); } /** - * Initiates the fetch of all blocks provided in the constructor, with possible retries in the - * event of transient IOExceptions. + * Initiates the transfer of all blocks provided in the constructor, with possible retries + * in the event of transient IOExceptions. */ public void start() { - fetchAllOutstanding(); + transferAllOutstanding(); } /** - * Fires off a request to fetch all blocks that have not been fetched successfully or permanently - * failed (i.e., by a non-IOException). + * Fires off a request to transfer all blocks that have not been transferred successfully or + * permanently failed (i.e., by a non-IOException). */ - private void fetchAllOutstanding() { + private void transferAllOutstanding() { // Start by retrieving our shared state within a synchronized block. - String[] blockIdsToFetch; + String[] blockIdsToTransfer; int numRetries; - RetryingBlockFetchListener myListener; + RetryingBlockTransferListener myListener; synchronized (this) { - blockIdsToFetch = outstandingBlocksIds.toArray(new String[outstandingBlocksIds.size()]); + blockIdsToTransfer = outstandingBlocksIds.toArray(new String[outstandingBlocksIds.size()]); numRetries = retryCount; myListener = currentListener; } - // Now initiate the fetch on all outstanding blocks, possibly initiating a retry if that fails. + // Now initiate the transfer on all outstanding blocks, possibly initiating a retry if that + // fails. try { - fetchStarter.createAndStart(blockIdsToFetch, myListener); + transferStarter.createAndStart(blockIdsToTransfer, myListener); } catch (Exception e) { - logger.error(String.format("Exception while beginning fetch of %s outstanding blocks %s", - blockIdsToFetch.length, numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e); + logger.error(String.format("Exception while beginning %s of %s outstanding blocks %s", + listener.getTransferType(), blockIdsToTransfer.length, + numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e); if (shouldRetry(e)) { initiateRetry(); } else { - for (String bid : blockIdsToFetch) { - listener.onBlockFetchFailure(bid, e); + for (String bid : blockIdsToTransfer) { + listener.onBlockTransferFailure(bid, e); } } } @@ -167,23 +169,24 @@ public class RetryingBlockFetcher { /** * Lightweight method which initiates a retry in a different thread. The retry will involve - * calling fetchAllOutstanding() after a configured wait time. + * calling transferAllOutstanding() after a configured wait time. */ private synchronized void initiateRetry() { retryCount += 1; - currentListener = new RetryingBlockFetchListener(); + currentListener = new RetryingBlockTransferListener(); - logger.info("Retrying fetch ({}/{}) for {} outstanding blocks after {} ms", - retryCount, maxRetries, outstandingBlocksIds.size(), retryWaitTime); + logger.info("Retrying {} ({}/{}) for {} outstanding blocks after {} ms", + listener.getTransferType(), retryCount, maxRetries, outstandingBlocksIds.size(), + retryWaitTime); executorService.submit(() -> { Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS); - fetchAllOutstanding(); + transferAllOutstanding(); }); } /** - * Returns true if we should retry due a block fetch failure. We will retry if and only if + * Returns true if we should retry due a block transfer failure. We will retry if and only if * the exception was an IOException and we haven't retried 'maxRetries' times already. */ private synchronized boolean shouldRetry(Throwable e) { @@ -194,17 +197,17 @@ public class RetryingBlockFetcher { } /** - * Our RetryListener intercepts block fetch responses and forwards them to our parent listener. - * Note that in the event of a retry, we will immediately replace the 'currentListener' field, - * indicating that any responses from non-current Listeners should be ignored. + * Our RetryListener intercepts block transfer responses and forwards them to our parent + * listener. Note that in the event of a retry, we will immediately replace the 'currentListener' + * field, indicating that any responses from non-current Listeners should be ignored. */ - private class RetryingBlockFetchListener implements BlockFetchingListener { - @Override - public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { + private class RetryingBlockTransferListener implements + BlockFetchingListener, BlockPushingListener { + private void handleBlockTransferSuccess(String blockId, ManagedBuffer data) { // We will only forward this success message to our parent listener if this block request is // outstanding and we are still the active listener. boolean shouldForwardSuccess = false; - synchronized (RetryingBlockFetcher.this) { + synchronized (RetryingBlockTransferor.this) { if (this == currentListener && outstandingBlocksIds.contains(blockId)) { outstandingBlocksIds.remove(blockId); shouldForwardSuccess = true; @@ -213,28 +216,27 @@ public class RetryingBlockFetcher { // Now actually invoke the parent listener, outside of the synchronized block. if (shouldForwardSuccess) { - listener.onBlockFetchSuccess(blockId, data); + listener.onBlockTransferSuccess(blockId, data); } } - @Override - public void onBlockFetchFailure(String blockId, Throwable exception) { + private void handleBlockTransferFailure(String blockId, Throwable exception) { // We will only forward this failure to our parent listener if this block request is - // outstanding, we are still the active listener, AND we cannot retry the fetch. + // outstanding, we are still the active listener, AND we cannot retry the transfer. boolean shouldForwardFailure = false; - synchronized (RetryingBlockFetcher.this) { + synchronized (RetryingBlockTransferor.this) { if (this == currentListener && outstandingBlocksIds.contains(blockId)) { if (shouldRetry(exception)) { initiateRetry(); } else { if (errorHandler.shouldLogError(exception)) { logger.error( - String.format("Failed to fetch block %s, and will not retry (%s retries)", - blockId, retryCount), exception); + String.format("Failed to %s block %s, and will not retry (%s retries)", + listener.getTransferType(), blockId, retryCount), exception); } else { logger.debug( - String.format("Failed to fetch block %s, and will not retry (%s retries)", - blockId, retryCount), exception); + String.format("Failed to %s block %s, and will not retry (%s retries)", + listener.getTransferType(), blockId, retryCount), exception); } outstandingBlocksIds.remove(blockId); shouldForwardFailure = true; @@ -244,8 +246,48 @@ public class RetryingBlockFetcher { // Now actually invoke the parent listener, outside of the synchronized block. if (shouldForwardFailure) { - listener.onBlockFetchFailure(blockId, exception); + listener.onBlockTransferFailure(blockId, exception); } } + + @Override + public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { + handleBlockTransferSuccess(blockId, data); + } + + @Override + public void onBlockFetchFailure(String blockId, Throwable exception) { + handleBlockTransferFailure(blockId, exception); + } + + @Override + public void onBlockPushSuccess(String blockId, ManagedBuffer data) { + handleBlockTransferSuccess(blockId, data); + } + + @Override + public void onBlockPushFailure(String blockId, Throwable exception) { + handleBlockTransferFailure(blockId, exception); + } + + // RetryingBlockTransferListener's onBlockTransferSuccess and onBlockTransferFailure + // shouldn't be invoked. We only invoke these 2 methods on the parent listener. + @Override + public void onBlockTransferSuccess(String blockId, ManagedBuffer data) { + throw new RuntimeException( + "Invocation on RetryingBlockTransferListener.onBlockTransferSuccess is unexpected."); + } + + @Override + public void onBlockTransferFailure(String blockId, Throwable exception) { + throw new RuntimeException( + "Invocation on RetryingBlockTransferListener.onBlockTransferFailure is unexpected."); + } + + @Override + public String getTransferType() { + throw new RuntimeException( + "Invocation on RetryingBlockTransferListener.getTransferType is unexpected."); + } } } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java index e41198f8ae..f709a565c0 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java @@ -48,12 +48,12 @@ public class OneForOneBlockPusherSuite { blocks.put("shufflePush_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[1]))); String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]); - BlockFetchingListener listener = pushBlocks( + BlockPushingListener listener = pushBlocks( blocks, blockIds, Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0))); - verify(listener).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any()); + verify(listener).onBlockPushSuccess(eq("shufflePush_0_0_0"), any()); } @Test @@ -64,16 +64,16 @@ public class OneForOneBlockPusherSuite { blocks.put("shufflePush_0_2_0", new NettyManagedBuffer(Unpooled.wrappedBuffer(new byte[23]))); String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]); - BlockFetchingListener listener = pushBlocks( + BlockPushingListener listener = pushBlocks( blocks, blockIds, Arrays.asList(new PushBlockStream("app-id",0, 0, 0, 0, 0), new PushBlockStream("app-id", 0, 0, 1, 0, 1), new PushBlockStream("app-id", 0, 0, 2, 0, 2))); - verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any()); - verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_1_0"), any()); - verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_2_0"), any()); + verify(listener, times(1)).onBlockPushSuccess(eq("shufflePush_0_0_0"), any()); + verify(listener, times(1)).onBlockPushSuccess(eq("shufflePush_0_1_0"), any()); + verify(listener, times(1)).onBlockPushSuccess(eq("shufflePush_0_2_0"), any()); } @Test @@ -84,16 +84,16 @@ public class OneForOneBlockPusherSuite { blocks.put("shufflePush_0_2_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[0]))); String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]); - BlockFetchingListener listener = pushBlocks( + BlockPushingListener listener = pushBlocks( blocks, blockIds, Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0), new PushBlockStream("app-id", 0, 0, 1, 0, 1), new PushBlockStream("app-id", 0, 0, 2, 0, 2))); - verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any()); - verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_1_0"), any()); - verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_2_0"), any()); + verify(listener, times(1)).onBlockPushSuccess(eq("shufflePush_0_0_0"), any()); + verify(listener, times(1)).onBlockPushFailure(eq("shufflePush_0_1_0"), any()); + verify(listener, times(1)).onBlockPushFailure(eq("shufflePush_0_2_0"), any()); } @Test @@ -104,18 +104,18 @@ public class OneForOneBlockPusherSuite { blocks.put("shufflePush_0_2_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[0]))); String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]); - BlockFetchingListener listener = pushBlocks( + BlockPushingListener listener = pushBlocks( blocks, blockIds, Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0), new PushBlockStream("app-id", 0, 0, 1, 0, 1), new PushBlockStream("app-id", 0, 0, 2, 0, 2))); - verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any()); - verify(listener, times(0)).onBlockFetchSuccess(not(eq("shufflePush_0_0_0")), any()); - verify(listener, times(0)).onBlockFetchFailure(eq("shufflePush_0_0_0"), any()); - verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_1_0"), any()); - verify(listener, times(2)).onBlockFetchFailure(eq("shufflePush_0_2_0"), any()); + verify(listener, times(1)).onBlockPushSuccess(eq("shufflePush_0_0_0"), any()); + verify(listener, times(0)).onBlockPushSuccess(not(eq("shufflePush_0_0_0")), any()); + verify(listener, times(0)).onBlockPushFailure(eq("shufflePush_0_0_0"), any()); + verify(listener, times(1)).onBlockPushFailure(eq("shufflePush_0_1_0"), any()); + verify(listener, times(2)).onBlockPushFailure(eq("shufflePush_0_2_0"), any()); } /** @@ -123,12 +123,12 @@ public class OneForOneBlockPusherSuite { * If a block is an empty byte, a server side retriable exception will be thrown. * If a block is null, a non-retriable exception will be thrown. */ - private static BlockFetchingListener pushBlocks( + private static BlockPushingListener pushBlocks( LinkedHashMap blocks, String[] blockIds, Iterable expectMessages) { TransportClient client = mock(TransportClient.class); - BlockFetchingListener listener = mock(BlockFetchingListener.class); + BlockPushingListener listener = mock(BlockPushingListener.class); OneForOneBlockPusher pusher = new OneForOneBlockPusher(client, "app-id", 0, blockIds, listener, blocks); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java similarity index 85% rename from common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java rename to common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java index 6f90df5f61..1b44b061f3 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java @@ -38,13 +38,13 @@ import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NioManagedBuffer; import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.TransportConf; -import static org.apache.spark.network.shuffle.RetryingBlockFetcher.BlockFetchStarter; +import static org.apache.spark.network.shuffle.RetryingBlockTransferor.BlockTransferStarter; /** * Tests retry logic by throwing IOExceptions and ensuring that subsequent attempts are made to * fetch the lost blocks. */ -public class RetryingBlockFetcherSuite { +public class RetryingBlockTransferorSuite { private final ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13])); private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7])); @@ -64,8 +64,8 @@ public class RetryingBlockFetcherSuite { performInteractions(interactions, listener); - verify(listener).onBlockFetchSuccess("b0", block0); - verify(listener).onBlockFetchSuccess("b1", block1); + verify(listener).onBlockTransferSuccess("b0", block0); + verify(listener).onBlockTransferSuccess("b1", block1); verifyNoMoreInteractions(listener); } @@ -83,8 +83,9 @@ public class RetryingBlockFetcherSuite { performInteractions(interactions, listener); - verify(listener).onBlockFetchFailure(eq("b0"), any()); - verify(listener).onBlockFetchSuccess("b1", block1); + verify(listener).onBlockTransferFailure(eq("b0"), any()); + verify(listener).onBlockTransferSuccess("b1", block1); + verify(listener, atLeastOnce()).getTransferType(); verifyNoMoreInteractions(listener); } @@ -106,8 +107,9 @@ public class RetryingBlockFetcherSuite { performInteractions(interactions, listener); - verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0); - verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1); + verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0); + verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1); + verify(listener, atLeastOnce()).getTransferType(); verifyNoMoreInteractions(listener); } @@ -128,8 +130,9 @@ public class RetryingBlockFetcherSuite { performInteractions(interactions, listener); - verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0); - verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1); + verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0); + verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1); + verify(listener, atLeastOnce()).getTransferType(); verifyNoMoreInteractions(listener); } @@ -156,8 +159,9 @@ public class RetryingBlockFetcherSuite { performInteractions(interactions, listener); - verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0); - verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1); + verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0); + verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1); + verify(listener, atLeastOnce()).getTransferType(); verifyNoMoreInteractions(listener); } @@ -188,8 +192,9 @@ public class RetryingBlockFetcherSuite { performInteractions(interactions, listener); - verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0); - verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any()); + verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0); + verify(listener, timeout(5000)).onBlockTransferFailure(eq("b1"), any()); + verify(listener, atLeastOnce()).getTransferType(); verifyNoMoreInteractions(listener); } @@ -218,9 +223,10 @@ public class RetryingBlockFetcherSuite { performInteractions(interactions, listener); - verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0); - verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any()); - verify(listener, timeout(5000)).onBlockFetchSuccess("b2", block2); + verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0); + verify(listener, timeout(5000)).onBlockTransferFailure(eq("b1"), any()); + verify(listener, timeout(5000)).onBlockTransferSuccess("b2", block2); + verify(listener, atLeastOnce()).getTransferType(); verifyNoMoreInteractions(listener); } @@ -243,7 +249,7 @@ public class RetryingBlockFetcherSuite { "spark.shuffle.io.maxRetries", "2", "spark.shuffle.io.retryWait", "0")); TransportConf conf = new TransportConf("shuffle", provider); - BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class); + BlockTransferStarter fetchStarter = mock(BlockTransferStarter.class); Stubber stub = null; @@ -293,6 +299,6 @@ public class RetryingBlockFetcherSuite { assertNotNull(stub); stub.when(fetchStarter).createAndStart(any(), any()); String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]); - new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start(); + new RetryingBlockTransferor(conf, fetchStarter, blockIdArray, listener).start(); } } diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 828849812b..4e0beeaec9 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -36,7 +36,7 @@ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap} import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap} import org.apache.spark.network.server._ -import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, OneForOneBlockFetcher, RetryingBlockFetcher} +import org.apache.spark.network.shuffle.{BlockFetchingListener, BlockTransferListener, DownloadFileManager, OneForOneBlockFetcher, RetryingBlockTransferor} import org.apache.spark.network.shuffle.protocol.{UploadBlock, UploadBlockStream} import org.apache.spark.network.util.JavaUtils import org.apache.spark.rpc.RpcEndpointRef @@ -116,13 +116,15 @@ private[spark] class NettyBlockTransferService( } try { val maxRetries = transportConf.maxIORetries() - val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter { + val blockFetchStarter = new RetryingBlockTransferor.BlockTransferStarter { override def createAndStart(blockIds: Array[String], - listener: BlockFetchingListener): Unit = { + listener: BlockTransferListener): Unit = { + assert(listener.isInstanceOf[BlockFetchingListener], + s"Expecting a BlockFetchingListener, but got ${listener.getClass}") try { val client = clientFactory.createClient(host, port, maxRetries > 0) - new OneForOneBlockFetcher(client, appId, execId, blockIds, listener, - transportConf, tempFileManager).start() + new OneForOneBlockFetcher(client, appId, execId, blockIds, + listener.asInstanceOf[BlockFetchingListener], transportConf, tempFileManager).start() } catch { case e: IOException => Try { @@ -140,7 +142,7 @@ private[spark] class NettyBlockTransferService( if (maxRetries > 0) { // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's // a bug in this code. We should remove the if statement once we're sure of the stability. - new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start() + new RetryingBlockTransferor(transportConf, blockFetchStarter, blockIds, listener).start() } else { blockFetchStarter.createAndStart(blockIds, listener) } diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala index 53687bbd27..544c7536b1 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala @@ -33,7 +33,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf -import org.apache.spark.network.shuffle.BlockFetchingListener +import org.apache.spark.network.shuffle.BlockPushingListener import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler import org.apache.spark.network.util.TransportConf import org.apache.spark.shuffle.ShuffleBlockPusher._ @@ -205,7 +205,7 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging { val blockIds = request.blocks.map(_._1.toString) val remainingBlocks = new HashSet[String]() ++= blockIds - val blockPushListener = new BlockFetchingListener { + val blockPushListener = new BlockPushingListener { // Initiating a connection and pushing blocks to a remote shuffle service is always handled by // the block-push-threads. We should not initiate the connection creation in the // blockPushListener callbacks which are invoked by the netty eventloop because: @@ -224,12 +224,12 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging { }) } - override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { + override def onBlockPushSuccess(blockId: String, data: ManagedBuffer): Unit = { logTrace(s"Push for block $blockId to $address successful.") handleResult(PushResult(blockId, null)) } - override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { + override def onBlockPushFailure(blockId: String, exception: Throwable): Unit = { // check the message or it's cause to see it needs to be logged. if (!errorHandler.shouldLogError(exception)) { logTrace(s"Pushing block $blockId to $address failed.", exception) diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala index c8a8f37212..3a6bc47257 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala @@ -114,7 +114,7 @@ class NettyBlockTransferServiceSuite val listener = mock(classOf[BlockFetchingListener]) var hitExecutorDeadException = false - when(listener.onBlockFetchFailure(any(), any(classOf[ExecutorDeadException]))) + when(listener.onBlockTransferFailure(any(), any(classOf[ExecutorDeadException]))) .thenAnswer(_ => {hitExecutorDeadException = true}) service0 = createService(port, driverEndpointRef) diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala index 6a07fefad2..2800be1cb9 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala @@ -33,7 +33,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark._ import org.apache.spark.network.buffer.ManagedBuffer -import org.apache.spark.network.shuffle.{BlockFetchingListener, BlockStoreClient} +import org.apache.spark.network.shuffle.{BlockPushingListener, BlockStoreClient} import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler import org.apache.spark.network.util.TransportConf import org.apache.spark.serializer.JavaSerializer @@ -75,9 +75,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach { val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]] pushedBlocks ++= blocks val managedBuffers = invocation.getArguments()(3).asInstanceOf[Array[ManagedBuffer]] - val blockFetchListener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener] + val blockPushListener = invocation.getArguments()(4).asInstanceOf[BlockPushingListener] (blocks, managedBuffers).zipped.foreach((blockId, buffer) => { - blockFetchListener.onBlockFetchSuccess(blockId, buffer) + blockPushListener.onBlockPushSuccess(blockId, buffer) }) }) } @@ -164,24 +164,24 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach { test("Hit maxBlocksInFlightPerAddress limit so that the blocks are deferred") { conf.set("spark.reducer.maxBlocksInFlightPerAddress", "2") var blockPendingResponse : String = null - var listener : BlockFetchingListener = null + var listener : BlockPushingListener = null when(shuffleClient.pushBlocks(any(), any(), any(), any(), any())) .thenAnswer((invocation: InvocationOnMock) => { val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]] pushedBlocks ++= blocks val managedBuffers = invocation.getArguments()(3).asInstanceOf[Array[ManagedBuffer]] - val blockFetchListener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener] + val blockPushListener = invocation.getArguments()(4).asInstanceOf[BlockPushingListener] // Expecting 2 blocks assert(blocks.length == 2) if (blockPendingResponse == null) { blockPendingResponse = blocks(1) - listener = blockFetchListener + listener = blockPushListener // Respond with success only for the first block which will cause all the rest of the // blocks to be deferred - blockFetchListener.onBlockFetchSuccess(blocks(0), managedBuffers(0)) + blockPushListener.onBlockPushSuccess(blocks(0), managedBuffers(0)) } else { (blocks, managedBuffers).zipped.foreach((blockId, buffer) => { - blockFetchListener.onBlockFetchSuccess(blockId, buffer) + blockPushListener.onBlockPushSuccess(blockId, buffer) }) } }) @@ -193,7 +193,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach { .pushBlocks(any(), any(), any(), any(), any()) assert(pushedBlocks.length == 2) // this will trigger push of deferred blocks - listener.onBlockFetchSuccess(blockPendingResponse, mock(classOf[ManagedBuffer])) + listener.onBlockPushSuccess(blockPendingResponse, mock(classOf[ManagedBuffer])) pusher.runPendingTasks() verify(shuffleClient, times(4)) .pushBlocks(any(), any(), any(), any(), any()) @@ -248,17 +248,17 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach { when(shuffleClient.pushBlocks(any(), any(), any(), any(), any())) .thenAnswer((invocation: InvocationOnMock) => { val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]] - val blockFetchListener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener] + val blockPushListener = invocation.getArguments()(4).asInstanceOf[BlockPushingListener] blocks.foreach(blockId => { if (failBlock) { failBlock = false // Fail the first block with the collision exception. - blockFetchListener.onBlockFetchFailure(blockId, new RuntimeException( + blockPushListener.onBlockPushFailure(blockId, new RuntimeException( new IllegalArgumentException( BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX))) } else { pushedBlocks += blockId - blockFetchListener.onBlockFetchSuccess(blockId, mock(classOf[ManagedBuffer])) + blockPushListener.onBlockPushSuccess(blockId, mock(classOf[ManagedBuffer])) } }) }) @@ -278,16 +278,16 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach { when(shuffleClient.pushBlocks(any(), any(), any(), any(), any())) .thenAnswer((invocation: InvocationOnMock) => { val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]] - val blockFetchListener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener] + val blockPushListener = invocation.getArguments()(4).asInstanceOf[BlockPushingListener] blocks.foreach(blockId => { if (failBlock) { failBlock = false // Fail the first block with the too late exception. - blockFetchListener.onBlockFetchFailure(blockId, new RuntimeException( + blockPushListener.onBlockPushFailure(blockId, new RuntimeException( new IllegalArgumentException(BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX))) } else { pushedBlocks += blockId - blockFetchListener.onBlockFetchSuccess(blockId, mock(classOf[ManagedBuffer])) + blockPushListener.onBlockPushSuccess(blockId, mock(classOf[ManagedBuffer])) } }) }) @@ -307,9 +307,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach { .thenAnswer((invocation: InvocationOnMock) => { val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]] pushedBlocks ++= blocks - val blockFetchListener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener] + val blockPushListener = invocation.getArguments()(4).asInstanceOf[BlockPushingListener] blocks.foreach(blockId => { - blockFetchListener.onBlockFetchFailure( + blockPushListener.onBlockPushFailure( blockId, new RuntimeException(new ConnectException())) }) }) @@ -332,9 +332,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach { when(shuffleClient.pushBlocks(any(), any(), any(), any(), any())) .thenAnswer((invocation: InvocationOnMock) => { val pushedBlocks = invocation.getArguments()(2).asInstanceOf[Array[String]] - val blockFetchListener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener] + val blockPushListener = invocation.getArguments()(4).asInstanceOf[BlockPushingListener] pushedBlocks.foreach(blockId => { - blockFetchListener.onBlockFetchFailure( + blockPushListener.onBlockPushFailure( blockId, new IOException("Failed to send RPC", new FileNotFoundException("file not found"))) })