[SPARK-36266][SHUFFLE] Rename classes in shuffle RPC used for block push operations
### What changes were proposed in this pull request? This is a follow-up to #29855 according to the [comments](https://github.com/apache/spark/pull/29855/files#r505536514) In this PR, the following changes are made: 1. A new `BlockPushingListener` interface is created specifically for block push. The existing `BlockFetchingListener` interface is left as is, since it might be used by external shuffle solutions. These 2 interfaces are unified under `BlockTransferListener` to enable code reuse. 2. `RetryingBlockFetcher`, `BlockFetchStarter`, and `RetryingBlockFetchListener` are renamed to `RetryingBlockTransferor`, `BlockTransferStarter`, and `RetryingBlockTransferListener` respectively. This makes their names more generic to be reused across both block fetch and push. 3. Comments in `OneForOneBlockPusher` are further clarified to better explain how we handle retries for block push. ### Why are the changes needed? To make code cleaner without sacrificing backward compatibility. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests. Closes #33340 from Victsm/SPARK-32915-followup. Lead-authored-by: Min Shen <mshen@linkedin.com> Co-authored-by: Min Shen <victor.nju@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
This commit is contained in:
parent
634f96dde4
commit
c4aa54ed4e
|
@ -17,11 +17,9 @@
|
||||||
|
|
||||||
package org.apache.spark.network.shuffle;
|
package org.apache.spark.network.shuffle;
|
||||||
|
|
||||||
import java.util.EventListener;
|
|
||||||
|
|
||||||
import org.apache.spark.network.buffer.ManagedBuffer;
|
import org.apache.spark.network.buffer.ManagedBuffer;
|
||||||
|
|
||||||
public interface BlockFetchingListener extends EventListener {
|
public interface BlockFetchingListener extends BlockTransferListener {
|
||||||
/**
|
/**
|
||||||
* Called once per successfully fetched block. After this call returns, data will be released
|
* Called once per successfully fetched block. After this call returns, data will be released
|
||||||
* automatically. If the data will be passed to another thread, the receiver should retain()
|
* automatically. If the data will be passed to another thread, the receiver should retain()
|
||||||
|
@ -33,4 +31,19 @@ public interface BlockFetchingListener extends EventListener {
|
||||||
* Called at least once per block upon failures.
|
* Called at least once per block upon failures.
|
||||||
*/
|
*/
|
||||||
void onBlockFetchFailure(String blockId, Throwable exception);
|
void onBlockFetchFailure(String blockId, Throwable exception);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
default void onBlockTransferSuccess(String blockId, ManagedBuffer data) {
|
||||||
|
onBlockFetchSuccess(blockId, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
default void onBlockTransferFailure(String blockId, Throwable exception) {
|
||||||
|
onBlockFetchFailure(blockId, exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
default String getTransferType() {
|
||||||
|
return "fetch";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,54 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.spark.network.shuffle;
|
||||||
|
|
||||||
|
import org.apache.spark.network.buffer.ManagedBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback to handle block push success and failure. This interface and
|
||||||
|
* {@link BlockFetchingListener} are unified under {@link BlockTransferListener} to allow
|
||||||
|
* code reuse for handling block push and fetch retry.
|
||||||
|
*/
|
||||||
|
public interface BlockPushingListener extends BlockTransferListener {
|
||||||
|
/**
|
||||||
|
* Called once per successfully pushed block. After this call returns, data will be released
|
||||||
|
* automatically. If the data will be passed to another thread, the receiver should retain()
|
||||||
|
* and release() the buffer on their own, or copy the data to a new buffer.
|
||||||
|
*/
|
||||||
|
void onBlockPushSuccess(String blockId, ManagedBuffer data);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called at least once per block upon failures.
|
||||||
|
*/
|
||||||
|
void onBlockPushFailure(String blockId, Throwable exception);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
default void onBlockTransferSuccess(String blockId, ManagedBuffer data) {
|
||||||
|
onBlockPushSuccess(blockId, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
default void onBlockTransferFailure(String blockId, Throwable exception) {
|
||||||
|
onBlockPushFailure(blockId, exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
default String getTransferType() {
|
||||||
|
return "push";
|
||||||
|
}
|
||||||
|
}
|
|
@ -155,7 +155,7 @@ public abstract class BlockStoreClient implements Closeable {
|
||||||
int port,
|
int port,
|
||||||
String[] blockIds,
|
String[] blockIds,
|
||||||
ManagedBuffer[] buffers,
|
ManagedBuffer[] buffers,
|
||||||
BlockFetchingListener listener) {
|
BlockPushingListener listener) {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.spark.network.shuffle;
|
||||||
|
|
||||||
|
import java.util.EventListener;
|
||||||
|
|
||||||
|
import org.apache.spark.network.buffer.ManagedBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This interface unifies both {@link BlockFetchingListener} and {@link BlockPushingListener}
|
||||||
|
* under a single interface to allow code reuse, while also keeping the existing public interface
|
||||||
|
* to facilitate backward compatibility.
|
||||||
|
*/
|
||||||
|
public interface BlockTransferListener extends EventListener {
|
||||||
|
/**
|
||||||
|
* Called once per successfully transferred block.
|
||||||
|
*/
|
||||||
|
void onBlockTransferSuccess(String blockId, ManagedBuffer data);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called at least once per block transfer failures.
|
||||||
|
*/
|
||||||
|
void onBlockTransferFailure(String blockId, Throwable exception);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a string indicating the type of the listener such as fetch, push, or something else
|
||||||
|
*/
|
||||||
|
String getTransferType();
|
||||||
|
}
|
|
@ -25,9 +25,9 @@ import com.google.common.base.Throwables;
|
||||||
import org.apache.spark.annotation.Evolving;
|
import org.apache.spark.annotation.Evolving;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Plugs into {@link RetryingBlockFetcher} to further control when an exception should be retried
|
* Plugs into {@link RetryingBlockTransferor} to further control when an exception should be retried
|
||||||
* and logged.
|
* and logged.
|
||||||
* Note: {@link RetryingBlockFetcher} will delegate the exception to this handler only when
|
* Note: {@link RetryingBlockTransferor} will delegate the exception to this handler only when
|
||||||
* - remaining retries < max retries
|
* - remaining retries < max retries
|
||||||
* - exception is an IOException
|
* - exception is an IOException
|
||||||
*
|
*
|
||||||
|
|
|
@ -95,13 +95,15 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
|
||||||
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
|
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
|
||||||
try {
|
try {
|
||||||
int maxRetries = conf.maxIORetries();
|
int maxRetries = conf.maxIORetries();
|
||||||
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
|
RetryingBlockTransferor.BlockTransferStarter blockFetchStarter =
|
||||||
(inputBlockId, inputListener) -> {
|
(inputBlockId, inputListener) -> {
|
||||||
// Unless this client is closed.
|
// Unless this client is closed.
|
||||||
if (clientFactory != null) {
|
if (clientFactory != null) {
|
||||||
|
assert inputListener instanceof BlockFetchingListener :
|
||||||
|
"Expecting a BlockFetchingListener, but got " + inputListener.getClass();
|
||||||
TransportClient client = clientFactory.createClient(host, port, maxRetries > 0);
|
TransportClient client = clientFactory.createClient(host, port, maxRetries > 0);
|
||||||
new OneForOneBlockFetcher(client, appId, execId,
|
new OneForOneBlockFetcher(client, appId, execId, inputBlockId,
|
||||||
inputBlockId, inputListener, conf, downloadFileManager).start();
|
(BlockFetchingListener) inputListener, conf, downloadFileManager).start();
|
||||||
} else {
|
} else {
|
||||||
logger.info("This clientFactory was closed. Skipping further block fetch retries.");
|
logger.info("This clientFactory was closed. Skipping further block fetch retries.");
|
||||||
}
|
}
|
||||||
|
@ -110,7 +112,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
|
||||||
if (maxRetries > 0) {
|
if (maxRetries > 0) {
|
||||||
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
|
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
|
||||||
// a bug in this code. We should remove the if statement once we're sure of the stability.
|
// a bug in this code. We should remove the if statement once we're sure of the stability.
|
||||||
new RetryingBlockFetcher(conf, blockFetchStarter, blockIds, listener).start();
|
new RetryingBlockTransferor(conf, blockFetchStarter, blockIds, listener).start();
|
||||||
} else {
|
} else {
|
||||||
blockFetchStarter.createAndStart(blockIds, listener);
|
blockFetchStarter.createAndStart(blockIds, listener);
|
||||||
}
|
}
|
||||||
|
@ -128,7 +130,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
|
||||||
int port,
|
int port,
|
||||||
String[] blockIds,
|
String[] blockIds,
|
||||||
ManagedBuffer[] buffers,
|
ManagedBuffer[] buffers,
|
||||||
BlockFetchingListener listener) {
|
BlockPushingListener listener) {
|
||||||
checkInit();
|
checkInit();
|
||||||
assert blockIds.length == buffers.length : "Number of block ids and buffers do not match.";
|
assert blockIds.length == buffers.length : "Number of block ids and buffers do not match.";
|
||||||
|
|
||||||
|
@ -138,15 +140,21 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
|
||||||
}
|
}
|
||||||
logger.debug("Push {} shuffle blocks to {}:{}", blockIds.length, host, port);
|
logger.debug("Push {} shuffle blocks to {}:{}", blockIds.length, host, port);
|
||||||
try {
|
try {
|
||||||
RetryingBlockFetcher.BlockFetchStarter blockPushStarter =
|
RetryingBlockTransferor.BlockTransferStarter blockPushStarter =
|
||||||
(inputBlockId, inputListener) -> {
|
(inputBlockId, inputListener) -> {
|
||||||
TransportClient client = clientFactory.createClient(host, port);
|
if (clientFactory != null) {
|
||||||
new OneForOneBlockPusher(client, appId, conf.appAttemptId(), inputBlockId,
|
assert inputListener instanceof BlockPushingListener :
|
||||||
inputListener, buffersWithId).start();
|
"Expecting a BlockPushingListener, but got " + inputListener.getClass();
|
||||||
|
TransportClient client = clientFactory.createClient(host, port);
|
||||||
|
new OneForOneBlockPusher(client, appId, conf.appAttemptId(), inputBlockId,
|
||||||
|
(BlockPushingListener) inputListener, buffersWithId).start();
|
||||||
|
} else {
|
||||||
|
logger.info("This clientFactory was closed. Skipping further block push retries.");
|
||||||
|
}
|
||||||
};
|
};
|
||||||
int maxRetries = conf.maxIORetries();
|
int maxRetries = conf.maxIORetries();
|
||||||
if (maxRetries > 0) {
|
if (maxRetries > 0) {
|
||||||
new RetryingBlockFetcher(
|
new RetryingBlockTransferor(
|
||||||
conf, blockPushStarter, blockIds, listener, PUSH_ERROR_HANDLER).start();
|
conf, blockPushStarter, blockIds, listener, PUSH_ERROR_HANDLER).start();
|
||||||
} else {
|
} else {
|
||||||
blockPushStarter.createAndStart(blockIds, listener);
|
blockPushStarter.createAndStart(blockIds, listener);
|
||||||
|
@ -154,7 +162,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Exception while beginning pushBlocks", e);
|
logger.error("Exception while beginning pushBlocks", e);
|
||||||
for (String blockId : blockIds) {
|
for (String blockId : blockIds) {
|
||||||
listener.onBlockFetchFailure(blockId, e);
|
listener.onBlockPushFailure(blockId, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,7 +47,7 @@ public class OneForOneBlockPusher {
|
||||||
private final String appId;
|
private final String appId;
|
||||||
private final int appAttemptId;
|
private final int appAttemptId;
|
||||||
private final String[] blockIds;
|
private final String[] blockIds;
|
||||||
private final BlockFetchingListener listener;
|
private final BlockPushingListener listener;
|
||||||
private final Map<String, ManagedBuffer> buffers;
|
private final Map<String, ManagedBuffer> buffers;
|
||||||
|
|
||||||
public OneForOneBlockPusher(
|
public OneForOneBlockPusher(
|
||||||
|
@ -55,7 +55,7 @@ public class OneForOneBlockPusher {
|
||||||
String appId,
|
String appId,
|
||||||
int appAttemptId,
|
int appAttemptId,
|
||||||
String[] blockIds,
|
String[] blockIds,
|
||||||
BlockFetchingListener listener,
|
BlockPushingListener listener,
|
||||||
Map<String, ManagedBuffer> buffers) {
|
Map<String, ManagedBuffer> buffers) {
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.appId = appId;
|
this.appId = appId;
|
||||||
|
@ -78,21 +78,34 @@ public class OneForOneBlockPusher {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(ByteBuffer response) {
|
public void onSuccess(ByteBuffer response) {
|
||||||
// On receipt of a successful block push
|
// On receipt of a successful block push
|
||||||
listener.onBlockFetchSuccess(blockId, new NioManagedBuffer(ByteBuffer.allocate(0)));
|
listener.onBlockPushSuccess(blockId, new NioManagedBuffer(ByteBuffer.allocate(0)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable e) {
|
public void onFailure(Throwable e) {
|
||||||
// Since block push is best effort, i.e., if we encountered a block push failure that's not
|
// Since block push is best effort, i.e., if we encounter a block push failure that's still
|
||||||
// retriable or exceeding the max retires, we should not fail all remaining block pushes.
|
// retriable according to ErrorHandler (not a connection exception and the block is not too
|
||||||
// The best effort nature makes block push tolerable of a partial completion. Thus, we only
|
// late), we should not fail all remaining block pushes even though
|
||||||
// fail the block that's actually failed. Not that, on the RetryingBlockFetcher side, once
|
// RetryingBlockTransferor might consider this failure not retriable (exceeding max retry
|
||||||
// retry is initiated, it would still invalidate the previous active retry listener, and
|
// count etc). The best effort nature makes block push tolerable of a partial completion.
|
||||||
// retry all outstanding blocks. We are preventing forwarding unnecessary block push failures
|
// Thus, we only fail the block that's actually failed in this case. Note that, on the
|
||||||
// to the parent listener of the retry listener. The only exceptions would be if the block
|
// RetryingBlockTransferor side, if retry is initiated, it would still invalidate the
|
||||||
// push failure is due to block arriving on the server side after merge finalization, or the
|
// previous active retry listener, and retry pushing all outstanding blocks. However, since
|
||||||
// client fails to establish connection to the server side. In both cases, we would fail all
|
// the blocks to be pushed are preloaded into memory and the first attempt of pushing these
|
||||||
// remaining blocks.
|
// blocks might have already succeeded, retry pushing all the outstanding blocks should be
|
||||||
|
// very cheap (on the client side, the block data is in memory; on the server side, the block
|
||||||
|
// will be recognized as a duplicate which triggers noop handling). Here, by failing only the
|
||||||
|
// one block that's actually failed, we are essentially preventing forwarding unnecessary
|
||||||
|
// block push failures to the parent listener of the retry listener.
|
||||||
|
//
|
||||||
|
// Take the following as an example. For the common exception during block push handling,
|
||||||
|
// i.e. block collision, it is considered as retriable by ErrorHandler but not retriable
|
||||||
|
// by RetryingBlockTransferor. When we encounter a failure of this type, we only fail the
|
||||||
|
// one block encountering this issue not the remaining blocks in the same batch. On the
|
||||||
|
// RetryingBlockTransferor side, since this exception is considered as not retriable, it
|
||||||
|
// would immediately invoke parent listener's onBlockTransferFailure. However, the remaining
|
||||||
|
// blocks in the same batch would remain current and active and they won't be impacted by
|
||||||
|
// this exception.
|
||||||
if (PUSH_ERROR_HANDLER.shouldRetryError(e)) {
|
if (PUSH_ERROR_HANDLER.shouldRetryError(e)) {
|
||||||
String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index + 1);
|
String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index + 1);
|
||||||
failRemainingBlocks(targetBlockId, e);
|
failRemainingBlocks(targetBlockId, e);
|
||||||
|
@ -106,7 +119,7 @@ public class OneForOneBlockPusher {
|
||||||
private void failRemainingBlocks(String[] failedBlockIds, Throwable e) {
|
private void failRemainingBlocks(String[] failedBlockIds, Throwable e) {
|
||||||
for (String blockId : failedBlockIds) {
|
for (String blockId : failedBlockIds) {
|
||||||
try {
|
try {
|
||||||
listener.onBlockFetchFailure(blockId, e);
|
listener.onBlockPushFailure(blockId, e);
|
||||||
} catch (Exception e2) {
|
} catch (Exception e2) {
|
||||||
logger.error("Error in block push failure callback", e2);
|
logger.error("Error in block push failure callback", e2);
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,44 +34,44 @@ import org.apache.spark.network.util.NettyUtils;
|
||||||
import org.apache.spark.network.util.TransportConf;
|
import org.apache.spark.network.util.TransportConf;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wraps another BlockFetcher with the ability to automatically retry fetches which fail due to
|
* Wraps another BlockFetcher or BlockPusher with the ability to automatically retry block
|
||||||
* IOExceptions, which we hope are due to transient network conditions.
|
* transfers which fail due to IOExceptions, which we hope are due to transient network conditions.
|
||||||
*
|
*
|
||||||
* This fetcher provides stronger guarantees regarding the parent BlockFetchingListener. In
|
* This transferor provides stronger guarantees regarding the parent BlockTransferListener. In
|
||||||
* particular, the listener will be invoked exactly once per blockId, with a success or failure.
|
* particular, the listener will be invoked exactly once per blockId, with a success or failure.
|
||||||
*/
|
*/
|
||||||
public class RetryingBlockFetcher {
|
public class RetryingBlockTransferor {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to initiate the first fetch for all blocks, and subsequently for retrying the fetch on any
|
* Used to initiate the first transfer for all blocks, and subsequently for retrying the
|
||||||
* remaining blocks.
|
* transfer on any remaining blocks.
|
||||||
*/
|
*/
|
||||||
public interface BlockFetchStarter {
|
public interface BlockTransferStarter {
|
||||||
/**
|
/**
|
||||||
* Creates a new BlockFetcher to fetch the given block ids which may do some synchronous
|
* Creates a new BlockFetcher or BlockPusher to transfer the given block ids which may do
|
||||||
* bootstrapping followed by fully asynchronous block fetching.
|
* some synchronous bootstrapping followed by fully asynchronous block transferring.
|
||||||
* The BlockFetcher must eventually invoke the Listener on every input blockId, or else this
|
* The BlockFetcher or BlockPusher must eventually invoke the Listener on every input blockId,
|
||||||
* method must throw an exception.
|
* or else this method must throw an exception.
|
||||||
*
|
*
|
||||||
* This method should always attempt to get a new TransportClient from the
|
* This method should always attempt to get a new TransportClient from the
|
||||||
* {@link org.apache.spark.network.client.TransportClientFactory} in order to fix connection
|
* {@link org.apache.spark.network.client.TransportClientFactory} in order to fix connection
|
||||||
* issues.
|
* issues.
|
||||||
*/
|
*/
|
||||||
void createAndStart(String[] blockIds, BlockFetchingListener listener)
|
void createAndStart(String[] blockIds, BlockTransferListener listener)
|
||||||
throws IOException, InterruptedException;
|
throws IOException, InterruptedException;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Shared executor service used for waiting and retrying. */
|
/** Shared executor service used for waiting and retrying. */
|
||||||
private static final ExecutorService executorService = Executors.newCachedThreadPool(
|
private static final ExecutorService executorService = Executors.newCachedThreadPool(
|
||||||
NettyUtils.createThreadFactory("Block Fetch Retry"));
|
NettyUtils.createThreadFactory("Block Transfer Retry"));
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(RetryingBlockFetcher.class);
|
private static final Logger logger = LoggerFactory.getLogger(RetryingBlockTransferor.class);
|
||||||
|
|
||||||
/** Used to initiate new Block Fetches on our remaining blocks. */
|
/** Used to initiate new Block transfer on our remaining blocks. */
|
||||||
private final BlockFetchStarter fetchStarter;
|
private final BlockTransferStarter transferStarter;
|
||||||
|
|
||||||
/** Parent listener which we delegate all successful or permanently failed block fetches to. */
|
/** Parent listener which we delegate all successful or permanently failed block transfers to. */
|
||||||
private final BlockFetchingListener listener;
|
private final BlockTransferListener listener;
|
||||||
|
|
||||||
/** Max number of times we are allowed to retry. */
|
/** Max number of times we are allowed to retry. */
|
||||||
private final int maxRetries;
|
private final int maxRetries;
|
||||||
|
@ -86,80 +86,82 @@ public class RetryingBlockFetcher {
|
||||||
private int retryCount = 0;
|
private int retryCount = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set of all block ids which have not been fetched successfully or with a non-IO Exception.
|
* Set of all block ids which have not been transferred successfully or with a non-IO Exception.
|
||||||
* A retry involves requesting every outstanding block. Note that since this is a LinkedHashSet,
|
* A retry involves requesting every outstanding block. Note that since this is a LinkedHashSet,
|
||||||
* input ordering is preserved, so we always request blocks in the same order the user provided.
|
* input ordering is preserved, so we always request blocks in the same order the user provided.
|
||||||
*/
|
*/
|
||||||
private final LinkedHashSet<String> outstandingBlocksIds;
|
private final LinkedHashSet<String> outstandingBlocksIds;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The BlockFetchingListener that is active with our current BlockFetcher.
|
* The BlockTransferListener that is active with our current BlockFetcher.
|
||||||
* When we start a retry, we immediately replace this with a new Listener, which causes all any
|
* When we start a retry, we immediately replace this with a new Listener, which causes all any
|
||||||
* old Listeners to ignore all further responses.
|
* old Listeners to ignore all further responses.
|
||||||
*/
|
*/
|
||||||
private RetryingBlockFetchListener currentListener;
|
private RetryingBlockTransferListener currentListener;
|
||||||
|
|
||||||
private final ErrorHandler errorHandler;
|
private final ErrorHandler errorHandler;
|
||||||
|
|
||||||
public RetryingBlockFetcher(
|
public RetryingBlockTransferor(
|
||||||
TransportConf conf,
|
TransportConf conf,
|
||||||
RetryingBlockFetcher.BlockFetchStarter fetchStarter,
|
BlockTransferStarter transferStarter,
|
||||||
String[] blockIds,
|
String[] blockIds,
|
||||||
BlockFetchingListener listener,
|
BlockTransferListener listener,
|
||||||
ErrorHandler errorHandler) {
|
ErrorHandler errorHandler) {
|
||||||
this.fetchStarter = fetchStarter;
|
this.transferStarter = transferStarter;
|
||||||
this.listener = listener;
|
this.listener = listener;
|
||||||
this.maxRetries = conf.maxIORetries();
|
this.maxRetries = conf.maxIORetries();
|
||||||
this.retryWaitTime = conf.ioRetryWaitTimeMs();
|
this.retryWaitTime = conf.ioRetryWaitTimeMs();
|
||||||
this.outstandingBlocksIds = Sets.newLinkedHashSet();
|
this.outstandingBlocksIds = Sets.newLinkedHashSet();
|
||||||
Collections.addAll(outstandingBlocksIds, blockIds);
|
Collections.addAll(outstandingBlocksIds, blockIds);
|
||||||
this.currentListener = new RetryingBlockFetchListener();
|
this.currentListener = new RetryingBlockTransferListener();
|
||||||
this.errorHandler = errorHandler;
|
this.errorHandler = errorHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
public RetryingBlockFetcher(
|
public RetryingBlockTransferor(
|
||||||
TransportConf conf,
|
TransportConf conf,
|
||||||
BlockFetchStarter fetchStarter,
|
BlockTransferStarter transferStarter,
|
||||||
String[] blockIds,
|
String[] blockIds,
|
||||||
BlockFetchingListener listener) {
|
BlockFetchingListener listener) {
|
||||||
this(conf, fetchStarter, blockIds, listener, ErrorHandler.NOOP_ERROR_HANDLER);
|
this(conf, transferStarter, blockIds, listener, ErrorHandler.NOOP_ERROR_HANDLER);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initiates the fetch of all blocks provided in the constructor, with possible retries in the
|
* Initiates the transfer of all blocks provided in the constructor, with possible retries
|
||||||
* event of transient IOExceptions.
|
* in the event of transient IOExceptions.
|
||||||
*/
|
*/
|
||||||
public void start() {
|
public void start() {
|
||||||
fetchAllOutstanding();
|
transferAllOutstanding();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fires off a request to fetch all blocks that have not been fetched successfully or permanently
|
* Fires off a request to transfer all blocks that have not been transferred successfully or
|
||||||
* failed (i.e., by a non-IOException).
|
* permanently failed (i.e., by a non-IOException).
|
||||||
*/
|
*/
|
||||||
private void fetchAllOutstanding() {
|
private void transferAllOutstanding() {
|
||||||
// Start by retrieving our shared state within a synchronized block.
|
// Start by retrieving our shared state within a synchronized block.
|
||||||
String[] blockIdsToFetch;
|
String[] blockIdsToTransfer;
|
||||||
int numRetries;
|
int numRetries;
|
||||||
RetryingBlockFetchListener myListener;
|
RetryingBlockTransferListener myListener;
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
blockIdsToFetch = outstandingBlocksIds.toArray(new String[outstandingBlocksIds.size()]);
|
blockIdsToTransfer = outstandingBlocksIds.toArray(new String[outstandingBlocksIds.size()]);
|
||||||
numRetries = retryCount;
|
numRetries = retryCount;
|
||||||
myListener = currentListener;
|
myListener = currentListener;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now initiate the fetch on all outstanding blocks, possibly initiating a retry if that fails.
|
// Now initiate the transfer on all outstanding blocks, possibly initiating a retry if that
|
||||||
|
// fails.
|
||||||
try {
|
try {
|
||||||
fetchStarter.createAndStart(blockIdsToFetch, myListener);
|
transferStarter.createAndStart(blockIdsToTransfer, myListener);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error(String.format("Exception while beginning fetch of %s outstanding blocks %s",
|
logger.error(String.format("Exception while beginning %s of %s outstanding blocks %s",
|
||||||
blockIdsToFetch.length, numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
|
listener.getTransferType(), blockIdsToTransfer.length,
|
||||||
|
numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
|
||||||
|
|
||||||
if (shouldRetry(e)) {
|
if (shouldRetry(e)) {
|
||||||
initiateRetry();
|
initiateRetry();
|
||||||
} else {
|
} else {
|
||||||
for (String bid : blockIdsToFetch) {
|
for (String bid : blockIdsToTransfer) {
|
||||||
listener.onBlockFetchFailure(bid, e);
|
listener.onBlockTransferFailure(bid, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -167,23 +169,24 @@ public class RetryingBlockFetcher {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Lightweight method which initiates a retry in a different thread. The retry will involve
|
* Lightweight method which initiates a retry in a different thread. The retry will involve
|
||||||
* calling fetchAllOutstanding() after a configured wait time.
|
* calling transferAllOutstanding() after a configured wait time.
|
||||||
*/
|
*/
|
||||||
private synchronized void initiateRetry() {
|
private synchronized void initiateRetry() {
|
||||||
retryCount += 1;
|
retryCount += 1;
|
||||||
currentListener = new RetryingBlockFetchListener();
|
currentListener = new RetryingBlockTransferListener();
|
||||||
|
|
||||||
logger.info("Retrying fetch ({}/{}) for {} outstanding blocks after {} ms",
|
logger.info("Retrying {} ({}/{}) for {} outstanding blocks after {} ms",
|
||||||
retryCount, maxRetries, outstandingBlocksIds.size(), retryWaitTime);
|
listener.getTransferType(), retryCount, maxRetries, outstandingBlocksIds.size(),
|
||||||
|
retryWaitTime);
|
||||||
|
|
||||||
executorService.submit(() -> {
|
executorService.submit(() -> {
|
||||||
Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
|
Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
|
||||||
fetchAllOutstanding();
|
transferAllOutstanding();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if we should retry due a block fetch failure. We will retry if and only if
|
* Returns true if we should retry due a block transfer failure. We will retry if and only if
|
||||||
* the exception was an IOException and we haven't retried 'maxRetries' times already.
|
* the exception was an IOException and we haven't retried 'maxRetries' times already.
|
||||||
*/
|
*/
|
||||||
private synchronized boolean shouldRetry(Throwable e) {
|
private synchronized boolean shouldRetry(Throwable e) {
|
||||||
|
@ -194,17 +197,17 @@ public class RetryingBlockFetcher {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Our RetryListener intercepts block fetch responses and forwards them to our parent listener.
|
* Our RetryListener intercepts block transfer responses and forwards them to our parent
|
||||||
* Note that in the event of a retry, we will immediately replace the 'currentListener' field,
|
* listener. Note that in the event of a retry, we will immediately replace the 'currentListener'
|
||||||
* indicating that any responses from non-current Listeners should be ignored.
|
* field, indicating that any responses from non-current Listeners should be ignored.
|
||||||
*/
|
*/
|
||||||
private class RetryingBlockFetchListener implements BlockFetchingListener {
|
private class RetryingBlockTransferListener implements
|
||||||
@Override
|
BlockFetchingListener, BlockPushingListener {
|
||||||
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
|
private void handleBlockTransferSuccess(String blockId, ManagedBuffer data) {
|
||||||
// We will only forward this success message to our parent listener if this block request is
|
// We will only forward this success message to our parent listener if this block request is
|
||||||
// outstanding and we are still the active listener.
|
// outstanding and we are still the active listener.
|
||||||
boolean shouldForwardSuccess = false;
|
boolean shouldForwardSuccess = false;
|
||||||
synchronized (RetryingBlockFetcher.this) {
|
synchronized (RetryingBlockTransferor.this) {
|
||||||
if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
|
if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
|
||||||
outstandingBlocksIds.remove(blockId);
|
outstandingBlocksIds.remove(blockId);
|
||||||
shouldForwardSuccess = true;
|
shouldForwardSuccess = true;
|
||||||
|
@ -213,28 +216,27 @@ public class RetryingBlockFetcher {
|
||||||
|
|
||||||
// Now actually invoke the parent listener, outside of the synchronized block.
|
// Now actually invoke the parent listener, outside of the synchronized block.
|
||||||
if (shouldForwardSuccess) {
|
if (shouldForwardSuccess) {
|
||||||
listener.onBlockFetchSuccess(blockId, data);
|
listener.onBlockTransferSuccess(blockId, data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private void handleBlockTransferFailure(String blockId, Throwable exception) {
|
||||||
public void onBlockFetchFailure(String blockId, Throwable exception) {
|
|
||||||
// We will only forward this failure to our parent listener if this block request is
|
// We will only forward this failure to our parent listener if this block request is
|
||||||
// outstanding, we are still the active listener, AND we cannot retry the fetch.
|
// outstanding, we are still the active listener, AND we cannot retry the transfer.
|
||||||
boolean shouldForwardFailure = false;
|
boolean shouldForwardFailure = false;
|
||||||
synchronized (RetryingBlockFetcher.this) {
|
synchronized (RetryingBlockTransferor.this) {
|
||||||
if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
|
if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
|
||||||
if (shouldRetry(exception)) {
|
if (shouldRetry(exception)) {
|
||||||
initiateRetry();
|
initiateRetry();
|
||||||
} else {
|
} else {
|
||||||
if (errorHandler.shouldLogError(exception)) {
|
if (errorHandler.shouldLogError(exception)) {
|
||||||
logger.error(
|
logger.error(
|
||||||
String.format("Failed to fetch block %s, and will not retry (%s retries)",
|
String.format("Failed to %s block %s, and will not retry (%s retries)",
|
||||||
blockId, retryCount), exception);
|
listener.getTransferType(), blockId, retryCount), exception);
|
||||||
} else {
|
} else {
|
||||||
logger.debug(
|
logger.debug(
|
||||||
String.format("Failed to fetch block %s, and will not retry (%s retries)",
|
String.format("Failed to %s block %s, and will not retry (%s retries)",
|
||||||
blockId, retryCount), exception);
|
listener.getTransferType(), blockId, retryCount), exception);
|
||||||
}
|
}
|
||||||
outstandingBlocksIds.remove(blockId);
|
outstandingBlocksIds.remove(blockId);
|
||||||
shouldForwardFailure = true;
|
shouldForwardFailure = true;
|
||||||
|
@ -244,8 +246,48 @@ public class RetryingBlockFetcher {
|
||||||
|
|
||||||
// Now actually invoke the parent listener, outside of the synchronized block.
|
// Now actually invoke the parent listener, outside of the synchronized block.
|
||||||
if (shouldForwardFailure) {
|
if (shouldForwardFailure) {
|
||||||
listener.onBlockFetchFailure(blockId, exception);
|
listener.onBlockTransferFailure(blockId, exception);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
|
||||||
|
handleBlockTransferSuccess(blockId, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onBlockFetchFailure(String blockId, Throwable exception) {
|
||||||
|
handleBlockTransferFailure(blockId, exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onBlockPushSuccess(String blockId, ManagedBuffer data) {
|
||||||
|
handleBlockTransferSuccess(blockId, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onBlockPushFailure(String blockId, Throwable exception) {
|
||||||
|
handleBlockTransferFailure(blockId, exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
// RetryingBlockTransferListener's onBlockTransferSuccess and onBlockTransferFailure
|
||||||
|
// shouldn't be invoked. We only invoke these 2 methods on the parent listener.
|
||||||
|
@Override
|
||||||
|
public void onBlockTransferSuccess(String blockId, ManagedBuffer data) {
|
||||||
|
throw new RuntimeException(
|
||||||
|
"Invocation on RetryingBlockTransferListener.onBlockTransferSuccess is unexpected.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onBlockTransferFailure(String blockId, Throwable exception) {
|
||||||
|
throw new RuntimeException(
|
||||||
|
"Invocation on RetryingBlockTransferListener.onBlockTransferFailure is unexpected.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTransferType() {
|
||||||
|
throw new RuntimeException(
|
||||||
|
"Invocation on RetryingBlockTransferListener.getTransferType is unexpected.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -48,12 +48,12 @@ public class OneForOneBlockPusherSuite {
|
||||||
blocks.put("shufflePush_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[1])));
|
blocks.put("shufflePush_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[1])));
|
||||||
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
|
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
|
||||||
|
|
||||||
BlockFetchingListener listener = pushBlocks(
|
BlockPushingListener listener = pushBlocks(
|
||||||
blocks,
|
blocks,
|
||||||
blockIds,
|
blockIds,
|
||||||
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0)));
|
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0)));
|
||||||
|
|
||||||
verify(listener).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
|
verify(listener).onBlockPushSuccess(eq("shufflePush_0_0_0"), any());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -64,16 +64,16 @@ public class OneForOneBlockPusherSuite {
|
||||||
blocks.put("shufflePush_0_2_0", new NettyManagedBuffer(Unpooled.wrappedBuffer(new byte[23])));
|
blocks.put("shufflePush_0_2_0", new NettyManagedBuffer(Unpooled.wrappedBuffer(new byte[23])));
|
||||||
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
|
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
|
||||||
|
|
||||||
BlockFetchingListener listener = pushBlocks(
|
BlockPushingListener listener = pushBlocks(
|
||||||
blocks,
|
blocks,
|
||||||
blockIds,
|
blockIds,
|
||||||
Arrays.asList(new PushBlockStream("app-id",0, 0, 0, 0, 0),
|
Arrays.asList(new PushBlockStream("app-id",0, 0, 0, 0, 0),
|
||||||
new PushBlockStream("app-id", 0, 0, 1, 0, 1),
|
new PushBlockStream("app-id", 0, 0, 1, 0, 1),
|
||||||
new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
|
new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
|
||||||
|
|
||||||
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
|
verify(listener, times(1)).onBlockPushSuccess(eq("shufflePush_0_0_0"), any());
|
||||||
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_1_0"), any());
|
verify(listener, times(1)).onBlockPushSuccess(eq("shufflePush_0_1_0"), any());
|
||||||
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_2_0"), any());
|
verify(listener, times(1)).onBlockPushSuccess(eq("shufflePush_0_2_0"), any());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -84,16 +84,16 @@ public class OneForOneBlockPusherSuite {
|
||||||
blocks.put("shufflePush_0_2_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
|
blocks.put("shufflePush_0_2_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
|
||||||
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
|
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
|
||||||
|
|
||||||
BlockFetchingListener listener = pushBlocks(
|
BlockPushingListener listener = pushBlocks(
|
||||||
blocks,
|
blocks,
|
||||||
blockIds,
|
blockIds,
|
||||||
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0),
|
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0),
|
||||||
new PushBlockStream("app-id", 0, 0, 1, 0, 1),
|
new PushBlockStream("app-id", 0, 0, 1, 0, 1),
|
||||||
new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
|
new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
|
||||||
|
|
||||||
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
|
verify(listener, times(1)).onBlockPushSuccess(eq("shufflePush_0_0_0"), any());
|
||||||
verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_1_0"), any());
|
verify(listener, times(1)).onBlockPushFailure(eq("shufflePush_0_1_0"), any());
|
||||||
verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_2_0"), any());
|
verify(listener, times(1)).onBlockPushFailure(eq("shufflePush_0_2_0"), any());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -104,18 +104,18 @@ public class OneForOneBlockPusherSuite {
|
||||||
blocks.put("shufflePush_0_2_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
|
blocks.put("shufflePush_0_2_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
|
||||||
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
|
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
|
||||||
|
|
||||||
BlockFetchingListener listener = pushBlocks(
|
BlockPushingListener listener = pushBlocks(
|
||||||
blocks,
|
blocks,
|
||||||
blockIds,
|
blockIds,
|
||||||
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0),
|
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0),
|
||||||
new PushBlockStream("app-id", 0, 0, 1, 0, 1),
|
new PushBlockStream("app-id", 0, 0, 1, 0, 1),
|
||||||
new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
|
new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
|
||||||
|
|
||||||
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
|
verify(listener, times(1)).onBlockPushSuccess(eq("shufflePush_0_0_0"), any());
|
||||||
verify(listener, times(0)).onBlockFetchSuccess(not(eq("shufflePush_0_0_0")), any());
|
verify(listener, times(0)).onBlockPushSuccess(not(eq("shufflePush_0_0_0")), any());
|
||||||
verify(listener, times(0)).onBlockFetchFailure(eq("shufflePush_0_0_0"), any());
|
verify(listener, times(0)).onBlockPushFailure(eq("shufflePush_0_0_0"), any());
|
||||||
verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_1_0"), any());
|
verify(listener, times(1)).onBlockPushFailure(eq("shufflePush_0_1_0"), any());
|
||||||
verify(listener, times(2)).onBlockFetchFailure(eq("shufflePush_0_2_0"), any());
|
verify(listener, times(2)).onBlockPushFailure(eq("shufflePush_0_2_0"), any());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -123,12 +123,12 @@ public class OneForOneBlockPusherSuite {
|
||||||
* If a block is an empty byte, a server side retriable exception will be thrown.
|
* If a block is an empty byte, a server side retriable exception will be thrown.
|
||||||
* If a block is null, a non-retriable exception will be thrown.
|
* If a block is null, a non-retriable exception will be thrown.
|
||||||
*/
|
*/
|
||||||
private static BlockFetchingListener pushBlocks(
|
private static BlockPushingListener pushBlocks(
|
||||||
LinkedHashMap<String, ManagedBuffer> blocks,
|
LinkedHashMap<String, ManagedBuffer> blocks,
|
||||||
String[] blockIds,
|
String[] blockIds,
|
||||||
Iterable<BlockTransferMessage> expectMessages) {
|
Iterable<BlockTransferMessage> expectMessages) {
|
||||||
TransportClient client = mock(TransportClient.class);
|
TransportClient client = mock(TransportClient.class);
|
||||||
BlockFetchingListener listener = mock(BlockFetchingListener.class);
|
BlockPushingListener listener = mock(BlockPushingListener.class);
|
||||||
OneForOneBlockPusher pusher =
|
OneForOneBlockPusher pusher =
|
||||||
new OneForOneBlockPusher(client, "app-id", 0, blockIds, listener, blocks);
|
new OneForOneBlockPusher(client, "app-id", 0, blockIds, listener, blocks);
|
||||||
|
|
||||||
|
|
|
@ -38,13 +38,13 @@ import org.apache.spark.network.buffer.ManagedBuffer;
|
||||||
import org.apache.spark.network.buffer.NioManagedBuffer;
|
import org.apache.spark.network.buffer.NioManagedBuffer;
|
||||||
import org.apache.spark.network.util.MapConfigProvider;
|
import org.apache.spark.network.util.MapConfigProvider;
|
||||||
import org.apache.spark.network.util.TransportConf;
|
import org.apache.spark.network.util.TransportConf;
|
||||||
import static org.apache.spark.network.shuffle.RetryingBlockFetcher.BlockFetchStarter;
|
import static org.apache.spark.network.shuffle.RetryingBlockTransferor.BlockTransferStarter;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests retry logic by throwing IOExceptions and ensuring that subsequent attempts are made to
|
* Tests retry logic by throwing IOExceptions and ensuring that subsequent attempts are made to
|
||||||
* fetch the lost blocks.
|
* fetch the lost blocks.
|
||||||
*/
|
*/
|
||||||
public class RetryingBlockFetcherSuite {
|
public class RetryingBlockTransferorSuite {
|
||||||
|
|
||||||
private final ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
|
private final ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
|
||||||
private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
|
private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
|
||||||
|
@ -64,8 +64,8 @@ public class RetryingBlockFetcherSuite {
|
||||||
|
|
||||||
performInteractions(interactions, listener);
|
performInteractions(interactions, listener);
|
||||||
|
|
||||||
verify(listener).onBlockFetchSuccess("b0", block0);
|
verify(listener).onBlockTransferSuccess("b0", block0);
|
||||||
verify(listener).onBlockFetchSuccess("b1", block1);
|
verify(listener).onBlockTransferSuccess("b1", block1);
|
||||||
verifyNoMoreInteractions(listener);
|
verifyNoMoreInteractions(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,8 +83,9 @@ public class RetryingBlockFetcherSuite {
|
||||||
|
|
||||||
performInteractions(interactions, listener);
|
performInteractions(interactions, listener);
|
||||||
|
|
||||||
verify(listener).onBlockFetchFailure(eq("b0"), any());
|
verify(listener).onBlockTransferFailure(eq("b0"), any());
|
||||||
verify(listener).onBlockFetchSuccess("b1", block1);
|
verify(listener).onBlockTransferSuccess("b1", block1);
|
||||||
|
verify(listener, atLeastOnce()).getTransferType();
|
||||||
verifyNoMoreInteractions(listener);
|
verifyNoMoreInteractions(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,8 +107,9 @@ public class RetryingBlockFetcherSuite {
|
||||||
|
|
||||||
performInteractions(interactions, listener);
|
performInteractions(interactions, listener);
|
||||||
|
|
||||||
verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
|
verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
|
||||||
verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
|
verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1);
|
||||||
|
verify(listener, atLeastOnce()).getTransferType();
|
||||||
verifyNoMoreInteractions(listener);
|
verifyNoMoreInteractions(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,8 +130,9 @@ public class RetryingBlockFetcherSuite {
|
||||||
|
|
||||||
performInteractions(interactions, listener);
|
performInteractions(interactions, listener);
|
||||||
|
|
||||||
verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
|
verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
|
||||||
verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
|
verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1);
|
||||||
|
verify(listener, atLeastOnce()).getTransferType();
|
||||||
verifyNoMoreInteractions(listener);
|
verifyNoMoreInteractions(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,8 +159,9 @@ public class RetryingBlockFetcherSuite {
|
||||||
|
|
||||||
performInteractions(interactions, listener);
|
performInteractions(interactions, listener);
|
||||||
|
|
||||||
verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
|
verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
|
||||||
verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
|
verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1);
|
||||||
|
verify(listener, atLeastOnce()).getTransferType();
|
||||||
verifyNoMoreInteractions(listener);
|
verifyNoMoreInteractions(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,8 +192,9 @@ public class RetryingBlockFetcherSuite {
|
||||||
|
|
||||||
performInteractions(interactions, listener);
|
performInteractions(interactions, listener);
|
||||||
|
|
||||||
verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
|
verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
|
||||||
verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any());
|
verify(listener, timeout(5000)).onBlockTransferFailure(eq("b1"), any());
|
||||||
|
verify(listener, atLeastOnce()).getTransferType();
|
||||||
verifyNoMoreInteractions(listener);
|
verifyNoMoreInteractions(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -218,9 +223,10 @@ public class RetryingBlockFetcherSuite {
|
||||||
|
|
||||||
performInteractions(interactions, listener);
|
performInteractions(interactions, listener);
|
||||||
|
|
||||||
verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
|
verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
|
||||||
verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any());
|
verify(listener, timeout(5000)).onBlockTransferFailure(eq("b1"), any());
|
||||||
verify(listener, timeout(5000)).onBlockFetchSuccess("b2", block2);
|
verify(listener, timeout(5000)).onBlockTransferSuccess("b2", block2);
|
||||||
|
verify(listener, atLeastOnce()).getTransferType();
|
||||||
verifyNoMoreInteractions(listener);
|
verifyNoMoreInteractions(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -243,7 +249,7 @@ public class RetryingBlockFetcherSuite {
|
||||||
"spark.shuffle.io.maxRetries", "2",
|
"spark.shuffle.io.maxRetries", "2",
|
||||||
"spark.shuffle.io.retryWait", "0"));
|
"spark.shuffle.io.retryWait", "0"));
|
||||||
TransportConf conf = new TransportConf("shuffle", provider);
|
TransportConf conf = new TransportConf("shuffle", provider);
|
||||||
BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);
|
BlockTransferStarter fetchStarter = mock(BlockTransferStarter.class);
|
||||||
|
|
||||||
Stubber stub = null;
|
Stubber stub = null;
|
||||||
|
|
||||||
|
@ -293,6 +299,6 @@ public class RetryingBlockFetcherSuite {
|
||||||
assertNotNull(stub);
|
assertNotNull(stub);
|
||||||
stub.when(fetchStarter).createAndStart(any(), any());
|
stub.when(fetchStarter).createAndStart(any(), any());
|
||||||
String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
|
String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
|
||||||
new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start();
|
new RetryingBlockTransferor(conf, fetchStarter, blockIdArray, listener).start();
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -36,7 +36,7 @@ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
|
||||||
import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap}
|
import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap}
|
||||||
import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap}
|
import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap}
|
||||||
import org.apache.spark.network.server._
|
import org.apache.spark.network.server._
|
||||||
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, OneForOneBlockFetcher, RetryingBlockFetcher}
|
import org.apache.spark.network.shuffle.{BlockFetchingListener, BlockTransferListener, DownloadFileManager, OneForOneBlockFetcher, RetryingBlockTransferor}
|
||||||
import org.apache.spark.network.shuffle.protocol.{UploadBlock, UploadBlockStream}
|
import org.apache.spark.network.shuffle.protocol.{UploadBlock, UploadBlockStream}
|
||||||
import org.apache.spark.network.util.JavaUtils
|
import org.apache.spark.network.util.JavaUtils
|
||||||
import org.apache.spark.rpc.RpcEndpointRef
|
import org.apache.spark.rpc.RpcEndpointRef
|
||||||
|
@ -116,13 +116,15 @@ private[spark] class NettyBlockTransferService(
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
val maxRetries = transportConf.maxIORetries()
|
val maxRetries = transportConf.maxIORetries()
|
||||||
val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
|
val blockFetchStarter = new RetryingBlockTransferor.BlockTransferStarter {
|
||||||
override def createAndStart(blockIds: Array[String],
|
override def createAndStart(blockIds: Array[String],
|
||||||
listener: BlockFetchingListener): Unit = {
|
listener: BlockTransferListener): Unit = {
|
||||||
|
assert(listener.isInstanceOf[BlockFetchingListener],
|
||||||
|
s"Expecting a BlockFetchingListener, but got ${listener.getClass}")
|
||||||
try {
|
try {
|
||||||
val client = clientFactory.createClient(host, port, maxRetries > 0)
|
val client = clientFactory.createClient(host, port, maxRetries > 0)
|
||||||
new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
|
new OneForOneBlockFetcher(client, appId, execId, blockIds,
|
||||||
transportConf, tempFileManager).start()
|
listener.asInstanceOf[BlockFetchingListener], transportConf, tempFileManager).start()
|
||||||
} catch {
|
} catch {
|
||||||
case e: IOException =>
|
case e: IOException =>
|
||||||
Try {
|
Try {
|
||||||
|
@ -140,7 +142,7 @@ private[spark] class NettyBlockTransferService(
|
||||||
if (maxRetries > 0) {
|
if (maxRetries > 0) {
|
||||||
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
|
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
|
||||||
// a bug in this code. We should remove the if statement once we're sure of the stability.
|
// a bug in this code. We should remove the if statement once we're sure of the stability.
|
||||||
new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()
|
new RetryingBlockTransferor(transportConf, blockFetchStarter, blockIds, listener).start()
|
||||||
} else {
|
} else {
|
||||||
blockFetchStarter.createAndStart(blockIds, listener)
|
blockFetchStarter.createAndStart(blockIds, listener)
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.spark.internal.config._
|
||||||
import org.apache.spark.launcher.SparkLauncher
|
import org.apache.spark.launcher.SparkLauncher
|
||||||
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer}
|
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer}
|
||||||
import org.apache.spark.network.netty.SparkTransportConf
|
import org.apache.spark.network.netty.SparkTransportConf
|
||||||
import org.apache.spark.network.shuffle.BlockFetchingListener
|
import org.apache.spark.network.shuffle.BlockPushingListener
|
||||||
import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
|
import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
|
||||||
import org.apache.spark.network.util.TransportConf
|
import org.apache.spark.network.util.TransportConf
|
||||||
import org.apache.spark.shuffle.ShuffleBlockPusher._
|
import org.apache.spark.shuffle.ShuffleBlockPusher._
|
||||||
|
@ -205,7 +205,7 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
|
||||||
val blockIds = request.blocks.map(_._1.toString)
|
val blockIds = request.blocks.map(_._1.toString)
|
||||||
val remainingBlocks = new HashSet[String]() ++= blockIds
|
val remainingBlocks = new HashSet[String]() ++= blockIds
|
||||||
|
|
||||||
val blockPushListener = new BlockFetchingListener {
|
val blockPushListener = new BlockPushingListener {
|
||||||
// Initiating a connection and pushing blocks to a remote shuffle service is always handled by
|
// Initiating a connection and pushing blocks to a remote shuffle service is always handled by
|
||||||
// the block-push-threads. We should not initiate the connection creation in the
|
// the block-push-threads. We should not initiate the connection creation in the
|
||||||
// blockPushListener callbacks which are invoked by the netty eventloop because:
|
// blockPushListener callbacks which are invoked by the netty eventloop because:
|
||||||
|
@ -224,12 +224,12 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
|
override def onBlockPushSuccess(blockId: String, data: ManagedBuffer): Unit = {
|
||||||
logTrace(s"Push for block $blockId to $address successful.")
|
logTrace(s"Push for block $blockId to $address successful.")
|
||||||
handleResult(PushResult(blockId, null))
|
handleResult(PushResult(blockId, null))
|
||||||
}
|
}
|
||||||
|
|
||||||
override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
|
override def onBlockPushFailure(blockId: String, exception: Throwable): Unit = {
|
||||||
// check the message or it's cause to see it needs to be logged.
|
// check the message or it's cause to see it needs to be logged.
|
||||||
if (!errorHandler.shouldLogError(exception)) {
|
if (!errorHandler.shouldLogError(exception)) {
|
||||||
logTrace(s"Pushing block $blockId to $address failed.", exception)
|
logTrace(s"Pushing block $blockId to $address failed.", exception)
|
||||||
|
|
|
@ -114,7 +114,7 @@ class NettyBlockTransferServiceSuite
|
||||||
|
|
||||||
val listener = mock(classOf[BlockFetchingListener])
|
val listener = mock(classOf[BlockFetchingListener])
|
||||||
var hitExecutorDeadException = false
|
var hitExecutorDeadException = false
|
||||||
when(listener.onBlockFetchFailure(any(), any(classOf[ExecutorDeadException])))
|
when(listener.onBlockTransferFailure(any(), any(classOf[ExecutorDeadException])))
|
||||||
.thenAnswer(_ => {hitExecutorDeadException = true})
|
.thenAnswer(_ => {hitExecutorDeadException = true})
|
||||||
|
|
||||||
service0 = createService(port, driverEndpointRef)
|
service0 = createService(port, driverEndpointRef)
|
||||||
|
|
|
@ -33,7 +33,7 @@ import org.scalatest.BeforeAndAfterEach
|
||||||
|
|
||||||
import org.apache.spark._
|
import org.apache.spark._
|
||||||
import org.apache.spark.network.buffer.ManagedBuffer
|
import org.apache.spark.network.buffer.ManagedBuffer
|
||||||
import org.apache.spark.network.shuffle.{BlockFetchingListener, BlockStoreClient}
|
import org.apache.spark.network.shuffle.{BlockPushingListener, BlockStoreClient}
|
||||||
import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
|
import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
|
||||||
import org.apache.spark.network.util.TransportConf
|
import org.apache.spark.network.util.TransportConf
|
||||||
import org.apache.spark.serializer.JavaSerializer
|
import org.apache.spark.serializer.JavaSerializer
|
||||||
|
@ -75,9 +75,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
|
||||||
val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
|
val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
|
||||||
pushedBlocks ++= blocks
|
pushedBlocks ++= blocks
|
||||||
val managedBuffers = invocation.getArguments()(3).asInstanceOf[Array[ManagedBuffer]]
|
val managedBuffers = invocation.getArguments()(3).asInstanceOf[Array[ManagedBuffer]]
|
||||||
val blockFetchListener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
|
val blockPushListener = invocation.getArguments()(4).asInstanceOf[BlockPushingListener]
|
||||||
(blocks, managedBuffers).zipped.foreach((blockId, buffer) => {
|
(blocks, managedBuffers).zipped.foreach((blockId, buffer) => {
|
||||||
blockFetchListener.onBlockFetchSuccess(blockId, buffer)
|
blockPushListener.onBlockPushSuccess(blockId, buffer)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -164,24 +164,24 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
|
||||||
test("Hit maxBlocksInFlightPerAddress limit so that the blocks are deferred") {
|
test("Hit maxBlocksInFlightPerAddress limit so that the blocks are deferred") {
|
||||||
conf.set("spark.reducer.maxBlocksInFlightPerAddress", "2")
|
conf.set("spark.reducer.maxBlocksInFlightPerAddress", "2")
|
||||||
var blockPendingResponse : String = null
|
var blockPendingResponse : String = null
|
||||||
var listener : BlockFetchingListener = null
|
var listener : BlockPushingListener = null
|
||||||
when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
|
when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
|
||||||
.thenAnswer((invocation: InvocationOnMock) => {
|
.thenAnswer((invocation: InvocationOnMock) => {
|
||||||
val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
|
val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
|
||||||
pushedBlocks ++= blocks
|
pushedBlocks ++= blocks
|
||||||
val managedBuffers = invocation.getArguments()(3).asInstanceOf[Array[ManagedBuffer]]
|
val managedBuffers = invocation.getArguments()(3).asInstanceOf[Array[ManagedBuffer]]
|
||||||
val blockFetchListener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
|
val blockPushListener = invocation.getArguments()(4).asInstanceOf[BlockPushingListener]
|
||||||
// Expecting 2 blocks
|
// Expecting 2 blocks
|
||||||
assert(blocks.length == 2)
|
assert(blocks.length == 2)
|
||||||
if (blockPendingResponse == null) {
|
if (blockPendingResponse == null) {
|
||||||
blockPendingResponse = blocks(1)
|
blockPendingResponse = blocks(1)
|
||||||
listener = blockFetchListener
|
listener = blockPushListener
|
||||||
// Respond with success only for the first block which will cause all the rest of the
|
// Respond with success only for the first block which will cause all the rest of the
|
||||||
// blocks to be deferred
|
// blocks to be deferred
|
||||||
blockFetchListener.onBlockFetchSuccess(blocks(0), managedBuffers(0))
|
blockPushListener.onBlockPushSuccess(blocks(0), managedBuffers(0))
|
||||||
} else {
|
} else {
|
||||||
(blocks, managedBuffers).zipped.foreach((blockId, buffer) => {
|
(blocks, managedBuffers).zipped.foreach((blockId, buffer) => {
|
||||||
blockFetchListener.onBlockFetchSuccess(blockId, buffer)
|
blockPushListener.onBlockPushSuccess(blockId, buffer)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -193,7 +193,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
|
||||||
.pushBlocks(any(), any(), any(), any(), any())
|
.pushBlocks(any(), any(), any(), any(), any())
|
||||||
assert(pushedBlocks.length == 2)
|
assert(pushedBlocks.length == 2)
|
||||||
// this will trigger push of deferred blocks
|
// this will trigger push of deferred blocks
|
||||||
listener.onBlockFetchSuccess(blockPendingResponse, mock(classOf[ManagedBuffer]))
|
listener.onBlockPushSuccess(blockPendingResponse, mock(classOf[ManagedBuffer]))
|
||||||
pusher.runPendingTasks()
|
pusher.runPendingTasks()
|
||||||
verify(shuffleClient, times(4))
|
verify(shuffleClient, times(4))
|
||||||
.pushBlocks(any(), any(), any(), any(), any())
|
.pushBlocks(any(), any(), any(), any(), any())
|
||||||
|
@ -248,17 +248,17 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
|
||||||
when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
|
when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
|
||||||
.thenAnswer((invocation: InvocationOnMock) => {
|
.thenAnswer((invocation: InvocationOnMock) => {
|
||||||
val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
|
val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
|
||||||
val blockFetchListener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
|
val blockPushListener = invocation.getArguments()(4).asInstanceOf[BlockPushingListener]
|
||||||
blocks.foreach(blockId => {
|
blocks.foreach(blockId => {
|
||||||
if (failBlock) {
|
if (failBlock) {
|
||||||
failBlock = false
|
failBlock = false
|
||||||
// Fail the first block with the collision exception.
|
// Fail the first block with the collision exception.
|
||||||
blockFetchListener.onBlockFetchFailure(blockId, new RuntimeException(
|
blockPushListener.onBlockPushFailure(blockId, new RuntimeException(
|
||||||
new IllegalArgumentException(
|
new IllegalArgumentException(
|
||||||
BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX)))
|
BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX)))
|
||||||
} else {
|
} else {
|
||||||
pushedBlocks += blockId
|
pushedBlocks += blockId
|
||||||
blockFetchListener.onBlockFetchSuccess(blockId, mock(classOf[ManagedBuffer]))
|
blockPushListener.onBlockPushSuccess(blockId, mock(classOf[ManagedBuffer]))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -278,16 +278,16 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
|
||||||
when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
|
when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
|
||||||
.thenAnswer((invocation: InvocationOnMock) => {
|
.thenAnswer((invocation: InvocationOnMock) => {
|
||||||
val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
|
val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
|
||||||
val blockFetchListener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
|
val blockPushListener = invocation.getArguments()(4).asInstanceOf[BlockPushingListener]
|
||||||
blocks.foreach(blockId => {
|
blocks.foreach(blockId => {
|
||||||
if (failBlock) {
|
if (failBlock) {
|
||||||
failBlock = false
|
failBlock = false
|
||||||
// Fail the first block with the too late exception.
|
// Fail the first block with the too late exception.
|
||||||
blockFetchListener.onBlockFetchFailure(blockId, new RuntimeException(
|
blockPushListener.onBlockPushFailure(blockId, new RuntimeException(
|
||||||
new IllegalArgumentException(BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX)))
|
new IllegalArgumentException(BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX)))
|
||||||
} else {
|
} else {
|
||||||
pushedBlocks += blockId
|
pushedBlocks += blockId
|
||||||
blockFetchListener.onBlockFetchSuccess(blockId, mock(classOf[ManagedBuffer]))
|
blockPushListener.onBlockPushSuccess(blockId, mock(classOf[ManagedBuffer]))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -307,9 +307,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
|
||||||
.thenAnswer((invocation: InvocationOnMock) => {
|
.thenAnswer((invocation: InvocationOnMock) => {
|
||||||
val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
|
val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
|
||||||
pushedBlocks ++= blocks
|
pushedBlocks ++= blocks
|
||||||
val blockFetchListener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
|
val blockPushListener = invocation.getArguments()(4).asInstanceOf[BlockPushingListener]
|
||||||
blocks.foreach(blockId => {
|
blocks.foreach(blockId => {
|
||||||
blockFetchListener.onBlockFetchFailure(
|
blockPushListener.onBlockPushFailure(
|
||||||
blockId, new RuntimeException(new ConnectException()))
|
blockId, new RuntimeException(new ConnectException()))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -332,9 +332,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
|
||||||
when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
|
when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
|
||||||
.thenAnswer((invocation: InvocationOnMock) => {
|
.thenAnswer((invocation: InvocationOnMock) => {
|
||||||
val pushedBlocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
|
val pushedBlocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
|
||||||
val blockFetchListener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
|
val blockPushListener = invocation.getArguments()(4).asInstanceOf[BlockPushingListener]
|
||||||
pushedBlocks.foreach(blockId => {
|
pushedBlocks.foreach(blockId => {
|
||||||
blockFetchListener.onBlockFetchFailure(
|
blockPushListener.onBlockPushFailure(
|
||||||
blockId, new IOException("Failed to send RPC",
|
blockId, new IOException("Failed to send RPC",
|
||||||
new FileNotFoundException("file not found")))
|
new FileNotFoundException("file not found")))
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue