diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallbackWithID.java b/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallbackWithID.java index bd173b653e..3ee524a85d 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallbackWithID.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallbackWithID.java @@ -17,6 +17,16 @@ package org.apache.spark.network.client; +import java.nio.ByteBuffer; + public interface StreamCallbackWithID extends StreamCallback { String getID(); + + /** + * Response to return to client upon the completion of a stream. Currently only invoked in + * {@link org.apache.spark.network.server.TransportRequestHandler#processStreamUpload} + */ + default ByteBuffer getCompletionResponse() { + return ByteBuffer.allocate(0); + } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java new file mode 100644 index 0000000000..5906fa2833 --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.nio.ByteBuffer; + +import com.google.common.base.Preconditions; + +/** + * A special RuntimeException thrown when shuffle service experiences a non-fatal failure + * with handling block push requests with push-based shuffle. Due to the best-effort nature + * of push-based shuffle, there are cases where the exceptions gets thrown under certain + * relatively common cases such as when a pushed block is received after the corresponding + * shuffle is merge finalized or when a pushed block experiences merge collision. Under these + * scenarios, we throw this special RuntimeException. + */ +public class BlockPushNonFatalFailure extends RuntimeException { + /** + * String constant used for generating exception messages indicating a block to be merged + * arrives too late on the server side. When we get a block push failure because of the + * block arrives too late, we will not retry pushing the block nor log the exception on + * the client side. + */ + public static final String TOO_LATE_BLOCK_PUSH_MESSAGE_SUFFIX = + " is received after merged shuffle is finalized"; + + /** + * String constant used for generating exception messages indicating a block to be merged + * is a stale block push in the case of indeterminate stage retries on the server side. + * When we get a block push failure because of the block push being stale, we will not + * retry pushing the block nor log the exception on the client side. + */ + public static final String STALE_BLOCK_PUSH_MESSAGE_SUFFIX = + " is a stale block push from an indeterminate stage retry"; + + /** + * String constant used for generating exception messages indicating the server couldn't + * append a block after all available attempts due to collision with other blocks belonging + * to the same shuffle partition. When we get a block push failure because of the block + * couldn't be written due to this reason, we will not log the exception on the client side. + */ + public static final String BLOCK_APPEND_COLLISION_MSG_SUFFIX = + " experienced merge collision on the server side"; + + /** + * The error code of the failure, encoded as a ByteBuffer to be responded back to the client. + * Instead of responding a RPCFailure with the exception stack trace as the payload, + * which makes checking the content of the exception very tedious on the client side, + * we can respond a proper RPCResponse to make it more robust and efficient. This + * field is only set on the shuffle server side when the exception is originally generated. + */ + private ByteBuffer response; + + /** + * The error code of the failure. This field is only set on the client side when a + * BlockPushNonFatalFailure is recreated from the error code received from the server. + */ + private ReturnCode returnCode; + + public BlockPushNonFatalFailure(ByteBuffer response, String msg) { + super(msg); + this.response = response; + } + + public BlockPushNonFatalFailure(ReturnCode returnCode, String msg) { + super(msg); + this.returnCode = returnCode; + } + + /** + * Since this type of exception is used to only convey the error code, we reduce the + * exception initialization overhead by skipping filling the stack trace. + */ + @Override + public synchronized Throwable fillInStackTrace() { + return this; + } + + public ByteBuffer getResponse() { + // Ensure we do not invoke this method if response is not set + Preconditions.checkNotNull(response); + return response; + } + + public ReturnCode getReturnCode() { + // Ensure we do not invoke this method if returnCode is not set + Preconditions.checkNotNull(returnCode); + return returnCode; + } + + public enum ReturnCode { + /** + * Indicate the case of a successful merge of a pushed block. + */ + SUCCESS(0, ""), + /** + * Indicate a block to be merged arrives too late on the server side, i.e. after the + * corresponding shuffle has been merge finalized. When the client gets this code, it + * will not retry pushing the block. + */ + TOO_LATE_BLOCK_PUSH(1, TOO_LATE_BLOCK_PUSH_MESSAGE_SUFFIX), + /** + * Indicating the server couldn't append a block after all available attempts due to + * collision with other blocks belonging to the same shuffle partition. + */ + BLOCK_APPEND_COLLISION_DETECTED(2, BLOCK_APPEND_COLLISION_MSG_SUFFIX), + /** + * Indicate a block received on the server side is a stale block push in the case of + * indeterminate stage retries. When the client receives this code, it will not retry + * pushing the block. + */ + STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX); + + private final byte id; + // Error message suffix used to generate an error message for a given ReturnCode and + // a given block ID + private final String errorMsgSuffix; + + ReturnCode(int id, String errorMsgSuffix) { + assert id < 128 : "Cannot have more than 128 block push return code"; + this.id = (byte) id; + this.errorMsgSuffix = errorMsgSuffix; + } + + public byte id() { return id; } + } + + public static ReturnCode getReturnCode(byte id) { + switch (id) { + case 0: return ReturnCode.SUCCESS; + case 1: return ReturnCode.TOO_LATE_BLOCK_PUSH; + case 2: return ReturnCode.BLOCK_APPEND_COLLISION_DETECTED; + case 3: return ReturnCode.STALE_BLOCK_PUSH; + default: throw new IllegalArgumentException("Unknown block push return code: " + id); + } + } + + public static String getErrorMsg(String blockId, ReturnCode errorCode) { + Preconditions.checkArgument(errorCode != ReturnCode.SUCCESS); + return "Block " + blockId + errorCode.errorMsgSuffix; + } +} diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index ab2deac20f..5c07f20d3c 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -213,7 +213,15 @@ public class TransportRequestHandler extends MessageHandler { public void onComplete(String streamId) throws IOException { try { streamHandler.onComplete(streamId); - callback.onSuccess(ByteBuffer.allocate(0)); + callback.onSuccess(streamHandler.getCompletionResponse()); + } catch (BlockPushNonFatalFailure ex) { + // Respond an RPC message with the error code to client instead of using exceptions + // encoded in the RPCFailure. This type of exceptions gets thrown more frequently + // than a regular exception on the shuffle server side due to the best-effort nature + // of push-based shuffle and requires special handling on the client side. Using a + // proper RPCResponse is more efficient. + callback.onSuccess(ex.getResponse()); + streamHandler.onFailure(streamId, ex); } catch (Exception ex) { IOException ioExc = new IOException("Failure post-processing complete stream;" + " failing this rpc and leaving channel active", ex); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java index 0149ad7434..271d76220b 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java @@ -23,6 +23,9 @@ import java.net.ConnectException; import com.google.common.base.Throwables; import org.apache.spark.annotation.Evolving; +import org.apache.spark.network.server.BlockPushNonFatalFailure; + +import static org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode.*; /** * Plugs into {@link RetryingBlockTransferor} to further control when an exception should be retried @@ -53,27 +56,6 @@ public interface ErrorHandler { * @since 3.1.0 */ class BlockPushErrorHandler implements ErrorHandler { - /** - * String constant used for generating exception messages indicating a block to be merged - * arrives too late or stale block push in the case of indeterminate stage retries on the - * server side, and also for later checking such exceptions on the client side. When we get - * a block push failure because of the block push being stale or arrives too late, we will - * not retry pushing the block nor log the exception on the client side. - */ - public static final String TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX = - "received after merged shuffle is finalized or stale block push as shuffle blocks of a" - + " higher shuffleMergeId for the shuffle is being pushed"; - - /** - * String constant used for generating exception messages indicating the server couldn't - * append a block after all available attempts due to collision with other blocks belonging - * to the same shuffle partition, and also for later checking such exceptions on the client - * side. When we get a block push failure because of the block couldn't be written due to - * this reason, we will not log the exception on the client side. - */ - public static final String BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX = - "Couldn't find an opportunity to write block"; - /** * String constant used for generating exception messages indicating the server encountered * IOExceptions multiple times, greater than the configured threshold, while trying to merged @@ -105,16 +87,15 @@ public interface ErrorHandler { return false; } - String errorStackTrace = Throwables.getStackTraceAsString(t); // If the block is too late or stale block push, there is no need to retry it - return !errorStackTrace.contains(TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX); + return !(t instanceof BlockPushNonFatalFailure && + (((BlockPushNonFatalFailure) t).getReturnCode() == TOO_LATE_BLOCK_PUSH || + ((BlockPushNonFatalFailure) t).getReturnCode() == STALE_BLOCK_PUSH)); } @Override public boolean shouldLogError(Throwable t) { - String errorStackTrace = Throwables.getStackTraceAsString(t); - return !(errorStackTrace.contains(BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX) || - errorStackTrace.contains(TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX)); + return !(t instanceof BlockPushNonFatalFailure); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index 826402c081..4c0e9f301a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -218,7 +218,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient { public void onSuccess(int numChunks, ManagedBuffer buffer) { logger.trace("Successfully got merged block meta for shuffleId {} shuffleMergeId {}" + " reduceId {}", shuffleId, shuffleMergeId, reduceId); - listener.onSuccess(shuffleId, reduceId, shuffleMergeId, + listener.onSuccess(shuffleId, shuffleMergeId, reduceId, new MergedBlockMeta(numChunks, buffer)); } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java index f9d313c254..8885dc9f2e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Map; +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +29,10 @@ import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NioManagedBuffer; import org.apache.spark.network.client.RpcResponseCallback; import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.server.BlockPushNonFatalFailure; +import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode; +import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; import org.apache.spark.network.shuffle.protocol.PushBlockStream; /** @@ -77,42 +82,58 @@ public class OneForOneBlockPusher { @Override public void onSuccess(ByteBuffer response) { - // On receipt of a successful block push - listener.onBlockPushSuccess(blockId, new NioManagedBuffer(ByteBuffer.allocate(0))); + BlockPushReturnCode pushResponse = + (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(response); + // If the return code is not SUCCESS, the server has responded some error code. Handle + // the error accordingly. + ReturnCode returnCode = BlockPushNonFatalFailure.getReturnCode(pushResponse.returnCode); + if (returnCode != ReturnCode.SUCCESS) { + String blockId = pushResponse.failureBlockId; + Preconditions.checkArgument(!blockId.isEmpty()); + checkAndFailRemainingBlocks(index, new BlockPushNonFatalFailure(returnCode, + BlockPushNonFatalFailure.getErrorMsg(blockId, returnCode))); + } else { + // On receipt of a successful block push + listener.onBlockPushSuccess(blockId, new NioManagedBuffer(ByteBuffer.allocate(0))); + } } @Override public void onFailure(Throwable e) { - // Since block push is best effort, i.e., if we encounter a block push failure that's still - // retriable according to ErrorHandler (not a connection exception and the block is not too - // late), we should not fail all remaining block pushes even though - // RetryingBlockTransferor might consider this failure not retriable (exceeding max retry - // count etc). The best effort nature makes block push tolerable of a partial completion. - // Thus, we only fail the block that's actually failed in this case. Note that, on the - // RetryingBlockTransferor side, if retry is initiated, it would still invalidate the - // previous active retry listener, and retry pushing all outstanding blocks. However, since - // the blocks to be pushed are preloaded into memory and the first attempt of pushing these - // blocks might have already succeeded, retry pushing all the outstanding blocks should be - // very cheap (on the client side, the block data is in memory; on the server side, the block - // will be recognized as a duplicate which triggers noop handling). Here, by failing only the - // one block that's actually failed, we are essentially preventing forwarding unnecessary - // block push failures to the parent listener of the retry listener. - // - // Take the following as an example. For the common exception during block push handling, - // i.e. block collision, it is considered as retriable by ErrorHandler but not retriable - // by RetryingBlockTransferor. When we encounter a failure of this type, we only fail the - // one block encountering this issue not the remaining blocks in the same batch. On the - // RetryingBlockTransferor side, since this exception is considered as not retriable, it - // would immediately invoke parent listener's onBlockTransferFailure. However, the remaining - // blocks in the same batch would remain current and active and they won't be impacted by - // this exception. - if (PUSH_ERROR_HANDLER.shouldRetryError(e)) { - String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index + 1); - failRemainingBlocks(targetBlockId, e); - } else { - String[] targetBlockId = Arrays.copyOfRange(blockIds, index, blockIds.length); - failRemainingBlocks(targetBlockId, e); - } + checkAndFailRemainingBlocks(index, e); + } + } + + private void checkAndFailRemainingBlocks(int index, Throwable e) { + // Since block push is best effort, i.e., if we encounter a block push failure that's still + // retriable according to ErrorHandler (not a connection exception and the block is not too + // late), we should not fail all remaining block pushes even though + // RetryingBlockTransferor might consider this failure not retriable (exceeding max retry + // count etc). The best effort nature makes block push tolerable of a partial completion. + // Thus, we only fail the block that's actually failed in this case. Note that, on the + // RetryingBlockTransferor side, if retry is initiated, it would still invalidate the + // previous active retry listener, and retry pushing all outstanding blocks. However, since + // the blocks to be pushed are preloaded into memory and the first attempt of pushing these + // blocks might have already succeeded, retry pushing all the outstanding blocks should be + // very cheap (on the client side, the block data is in memory; on the server side, the block + // will be recognized as a duplicate which triggers noop handling). Here, by failing only the + // one block that's actually failed, we are essentially preventing forwarding unnecessary + // block push failures to the parent listener of the retry listener. + // + // Take the following as an example. For the common exception during block push handling, + // i.e. block collision, it is considered as retriable by ErrorHandler but not retriable + // by RetryingBlockTransferor. When we encounter a failure of this type, we only fail the + // one block encountering this issue not the remaining blocks in the same batch. On the + // RetryingBlockTransferor side, since this exception is considered as not retriable, it + // would immediately invoke parent listener's onBlockTransferFailure. However, the remaining + // blocks in the same batch would remain current and active and they won't be impacted by + // this exception. + if (PUSH_ERROR_HANDLER.shouldRetryError(e)) { + String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index + 1); + failRemainingBlocks(targetBlockId, e); + } else { + String[] targetBlockId = Arrays.copyOfRange(blockIds, index, blockIds.length); + failRemainingBlocks(targetBlockId, e); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 84ecf3d18a..30777ca2b0 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -56,6 +56,9 @@ import org.slf4j.LoggerFactory; import org.apache.spark.network.buffer.FileSegmentManagedBuffer; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.client.StreamCallbackWithID; +import org.apache.spark.network.server.BlockPushNonFatalFailure; +import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode; +import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; import org.apache.spark.network.shuffle.protocol.MergeStatuses; @@ -81,6 +84,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { private static final int UNDEFINED_ATTEMPT_ID = -1; // Shuffles of determinate stages will have shuffleMergeId set to 0 private static final int DETERMINATE_SHUFFLE_MERGE_ID = 0; + private static final ErrorHandler.BlockPushErrorHandler ERROR_HANDLER = createErrorHandler(); + // ByteBuffer to respond to client upon a successful merge of a pushed block + private static final ByteBuffer SUCCESS_RESPONSE = + new BlockPushReturnCode(ReturnCode.SUCCESS.id(), "").toByteBuffer().asReadOnlyBuffer(); // ConcurrentHashMap doesn't allow null for keys or values which is why this is required. // Marker to identify finalized indeterminate shuffle partitions in the case of indeterminate @@ -101,7 +108,6 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { private final TransportConf conf; private final int minChunkSize; private final int ioExceptionsThresholdDuringMerge; - private final ErrorHandler.BlockPushErrorHandler errorHandler; @SuppressWarnings("UnstableApiUsage") private final LoadingCache indexCache; @@ -119,7 +125,19 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { .maximumWeight(conf.mergedIndexCacheSize()) .weigher((Weigher)(file, indexInfo) -> indexInfo.getSize()) .build(ShuffleIndexInformation::new); - this.errorHandler = new ErrorHandler.BlockPushErrorHandler(); + } + + @VisibleForTesting + protected static ErrorHandler.BlockPushErrorHandler createErrorHandler() { + return new ErrorHandler.BlockPushErrorHandler() { + // Explicitly use a shuffle service side error handler for handling exceptions. + // BlockPushNonException on the server side only has the response field set. It + // might require different handling logic compared with a client side error handler. + @Override + public boolean shouldLogError(Throwable t) { + return !(t instanceof BlockPushNonFatalFailure); + } + }; } @VisibleForTesting @@ -140,7 +158,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { AppShuffleInfo appShuffleInfo, int shuffleId, int shuffleMergeId, - int reduceId) throws StaleBlockPushException { + int reduceId, + String blockId) throws BlockPushNonFatalFailure { ConcurrentMap shuffles = appShuffleInfo.shuffles; AppShuffleMergePartitionsInfo shufflePartitionsWithMergeId = shuffles.compute(shuffleId, (id, appShuffleMergePartitionsInfo) -> { @@ -152,7 +171,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // In that case the block is considered late. In the case of indeterminate stages, most // recent shuffleMergeId finalized would be pointing to INDETERMINATE_SHUFFLE_FINALIZED if (dataFile.exists()) { - return null; + throw new BlockPushNonFatalFailure(new BlockPushReturnCode( + ReturnCode.TOO_LATE_BLOCK_PUSH.id(), blockId).toByteBuffer(), + BlockPushNonFatalFailure.getErrorMsg(blockId, ReturnCode.TOO_LATE_BLOCK_PUSH)); } else { logger.info("Creating a new attempt for shuffle blocks push request for shuffle {}" + " with shuffleMergeId {} for application {}_{}", shuffleId, shuffleMergeId, @@ -164,10 +185,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // current incoming one int latestShuffleMergeId = appShuffleMergePartitionsInfo.shuffleMergeId; if (latestShuffleMergeId > shuffleMergeId) { - throw new StaleBlockPushException(String.format("Rejecting shuffle blocks push request" - + " for shuffle %s with shuffleMergeId %s for application %s_%s as a higher" - + " shuffleMergeId %s request is already seen", shuffleId, shuffleMergeId, - appShuffleInfo.appId, appShuffleInfo.attemptId, latestShuffleMergeId)); + throw new BlockPushNonFatalFailure( + new BlockPushReturnCode(ReturnCode.STALE_BLOCK_PUSH.id(), blockId).toByteBuffer(), + BlockPushNonFatalFailure.getErrorMsg(blockId, ReturnCode.STALE_BLOCK_PUSH)); } else if (latestShuffleMergeId == shuffleMergeId) { return appShuffleMergePartitionsInfo; } else { @@ -188,7 +208,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // It only gets here when the shuffle is already finalized. if (null == shufflePartitionsWithMergeId || INDETERMINATE_SHUFFLE_FINALIZED == shufflePartitionsWithMergeId.shuffleMergePartitions) { - return null; + throw new BlockPushNonFatalFailure( + new BlockPushReturnCode(ReturnCode.TOO_LATE_BLOCK_PUSH.id(), blockId).toByteBuffer(), + BlockPushNonFatalFailure.getErrorMsg(blockId, ReturnCode.TOO_LATE_BLOCK_PUSH)); } Map shuffleMergePartitions = @@ -373,9 +395,6 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { @Override public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) { AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId); - final String streamId = String.format("%s_%d_%d_%d_%d", - OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, msg.shuffleId, msg.shuffleMergeId, - msg.mapIndex, msg.reduceId); if (appShuffleInfo.attemptId != msg.appAttemptId) { // If this Block belongs to a former application attempt, it is considered late, // as only the blocks from the current application attempt will be merged @@ -385,14 +404,20 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { + "with the current attempt id %s stored in shuffle service for application %s", msg.appAttemptId, appShuffleInfo.attemptId, msg.appId)); } + // Use string concatenation here to avoid the overhead with String.format on every + // pushed block. + final String streamId = OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX + "_" + + msg.shuffleId + "_" + msg.shuffleMergeId + "_" + msg.mapIndex + "_" + msg.reduceId; // Retrieve merged shuffle file metadata AppShufflePartitionInfo partitionInfoBeforeCheck; + BlockPushNonFatalFailure failure = null; try { partitionInfoBeforeCheck = getOrCreateAppShufflePartitionInfo(appShuffleInfo, msg.shuffleId, - msg.shuffleMergeId, msg.reduceId); - } catch(StaleBlockPushException sbp) { + msg.shuffleMergeId, msg.reduceId, streamId); + } catch (BlockPushNonFatalFailure bpf) { // Set partitionInfoBeforeCheck to null so that stale block push gets handled. partitionInfoBeforeCheck = null; + failure = bpf; } // Here partitionInfo will be null in 3 cases: // 1) The request is received for a block that has already been merged, this is possible due @@ -436,17 +461,15 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // getting killed. When this happens, we need to distinguish the duplicate blocks as they // arrive. More details on this is explained in later comments. - // Track if the block is received after shuffle merge finalized or from an older - // shuffleMergeId attempt. - final boolean isStaleBlockOrTooLate = partitionInfoBeforeCheck == null; // Check if the given block is already merged by checking the bitmap against the given map // index - final AppShufflePartitionInfo partitionInfo = isStaleBlockOrTooLate ? null : + final AppShufflePartitionInfo partitionInfo = failure != null ? null : partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null : partitionInfoBeforeCheck; if (partitionInfo != null) { return new PushBlockStreamCallback( this, appShuffleInfo, streamId, partitionInfo, msg.mapIndex); } else { + final BlockPushNonFatalFailure finalFailure = failure; // For a duplicate block or a block which is late or stale block from an older // shuffleMergeId, respond back with a callback that handles them differently. return new StreamCallbackWithID() { @@ -463,11 +486,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { @Override public void onComplete(String streamId) { - if (isStaleBlockOrTooLate) { - // Throw an exception here so the block data is drained from channel and server - // responds RpcFailure to the client. - throw new RuntimeException(String.format("Block %s %s", streamId, - ErrorHandler.BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX)); + // Throw non-fatal failure here so the block data is drained from channel and server + // responds the error code to the client. + if (finalFailure != null) { + throw finalFailure; } // For duplicate block that is received before the shuffle merge finalizes, the // server should respond success to the client. @@ -476,6 +498,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { @Override public void onFailure(String streamId, Throwable cause) { } + + @Override + public ByteBuffer getCompletionResponse() { + return SUCCESS_RESPONSE.duplicate(); + } }; } } @@ -669,6 +696,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { return streamId; } + @Override + public ByteBuffer getCompletionResponse() { + return SUCCESS_RESPONSE.duplicate(); + } + /** * Write a ByteBuffer to the merged shuffle file. Here we keep track of the length of the * block data written to file. In case of failure during writing block to file, we use the @@ -746,20 +778,26 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } /** - * If appShuffleMergePartitionsInfo is null or shuffleMergePartitions is set to - * INDETERMINATE_SHUFFLE_FINALIZED or if the reduceId is not in the map then the - * shuffle is already finalized. Therefore the block push is too late. If - * appShuffleMergePartitionsInfo's shuffleMergeId is + * If appShuffleMergePartitionsInfo's shuffleMergeId is * greater than the request shuffleMergeId then it is a stale block push. */ - private boolean isStaleOrTooLate( + private boolean isStale( + AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo, + int shuffleMergeId) { + return appShuffleMergePartitionsInfo.shuffleMergeId > shuffleMergeId; + } + + /** + * If appShuffleMergePartitionsInfo is null or shuffleMergePartitions is set to + * INDETERMINATE_SHUFFLE_FINALIZED or if the reduceId is not in the map then the + * shuffle is already finalized. Therefore the block push is too late. + */ + private boolean isTooLate( AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo, - int shuffleMergeId, int reduceId) { return null == appShuffleMergePartitionsInfo || INDETERMINATE_SHUFFLE_FINALIZED == appShuffleMergePartitionsInfo.shuffleMergePartitions || - appShuffleMergePartitionsInfo.shuffleMergeId > shuffleMergeId || - !appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId); + !appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId); } @Override @@ -779,8 +817,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // to disk as well. This way, we avoid having to buffer the entirety of every blocks in // memory, while still providing the necessary guarantee. synchronized (partitionInfo) { - if (isStaleOrTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId), - partitionInfo.shuffleMergeId, partitionInfo.reduceId)) { + AppShuffleMergePartitionsInfo info = appShuffleInfo.shuffles.get(partitionInfo.shuffleId); + if (isStale(info, partitionInfo.shuffleMergeId) || + isTooLate(info, partitionInfo.reduceId)) { deferredBufs = null; return; } @@ -851,12 +890,19 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // was not received yet or this was the latest stage attempt (or latest shuffleMergeId) // generating shuffle output for the shuffle ID. By the time we finish reading this // message, the block request is either stale or too late. We should thus respond - // RpcFailure to the client. - if (isStaleOrTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId), - partitionInfo.shuffleMergeId, partitionInfo.reduceId)) { + // the error code to the client. + AppShuffleMergePartitionsInfo info = appShuffleInfo.shuffles.get(partitionInfo.shuffleId); + if (isTooLate(info, partitionInfo.reduceId)) { deferredBufs = null; - throw new RuntimeException(String.format("Block %s is %s", streamId, - ErrorHandler.BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX)); + throw new BlockPushNonFatalFailure( + new BlockPushReturnCode(ReturnCode.TOO_LATE_BLOCK_PUSH.id(), streamId).toByteBuffer(), + BlockPushNonFatalFailure.getErrorMsg(streamId, ReturnCode.TOO_LATE_BLOCK_PUSH)); + } + if (isStale(info, partitionInfo.shuffleMergeId)) { + deferredBufs = null; + throw new BlockPushNonFatalFailure( + new BlockPushReturnCode(ReturnCode.STALE_BLOCK_PUSH.id(), streamId).toByteBuffer(), + BlockPushNonFatalFailure.getErrorMsg(streamId, ReturnCode.STALE_BLOCK_PUSH)); } // Check if we can commit this block @@ -905,9 +951,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } } else { deferredBufs = null; - throw new RuntimeException(String.format("%s %s to merged shuffle", - ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX, - streamId)); + throw new BlockPushNonFatalFailure( + new BlockPushReturnCode(ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), streamId) + .toByteBuffer(), BlockPushNonFatalFailure.getErrorMsg( + streamId, ReturnCode.BLOCK_APPEND_COLLISION_DETECTED)); } } isWriting = false; @@ -915,7 +962,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { @Override public void onFailure(String streamId, Throwable throwable) throws IOException { - if (mergeManager.errorHandler.shouldLogError(throwable)) { + if (ERROR_HANDLER.shouldLogError(throwable)) { logger.error("Encountered issue when merging {}", streamId, throwable); } else { logger.debug("Encountered issue when merging {}", streamId, throwable); @@ -926,13 +973,15 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // an opportunity to write the block data to disk, we should also ignore here. if (isWriting) { synchronized (partitionInfo) { - if (!isStaleOrTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId), - partitionInfo.shuffleMergeId, partitionInfo.reduceId)) { - logger.debug("{} encountered failure", partitionInfo); - partitionInfo.setCurrentMapIndex(-1); - } + AppShuffleMergePartitionsInfo info = + appShuffleInfo.shuffles.get(partitionInfo.shuffleId); + if (!isTooLate(info, partitionInfo.reduceId) && + !isStale(info, partitionInfo.shuffleMergeId)) { + logger.debug("{} encountered failure", partitionInfo); + partitionInfo.setCurrentMapIndex(-1); } } + } isWriting = false; } @@ -1350,10 +1399,4 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { return pos; } } - - public static class StaleBlockPushException extends RuntimeException { - public StaleBlockPushException(String message) { - super(message); - } - } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockPushReturnCode.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockPushReturnCode.java new file mode 100644 index 0000000000..0455d67c5a --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockPushReturnCode.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.protocol; + +import java.util.Objects; + +import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +import org.apache.spark.network.protocol.Encoders; +import org.apache.spark.network.server.BlockPushNonFatalFailure; + +/** + * Error code indicating a non-fatal failure of a block push request. + * Due to the best-effort nature of push-based shuffle, these failures + * do not impact the completion of the block push process. The list of + * such errors is in + * {@link org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode}. + * + * @since 3.2.0 + */ +public class BlockPushReturnCode extends BlockTransferMessage { + public final byte returnCode; + // Block ID of the block that experiences a non-fatal block push failure. + // Will be an empty string for any successfully pushed block. + public final String failureBlockId; + + public BlockPushReturnCode(byte returnCode, String failureBlockId) { + Preconditions.checkNotNull(BlockPushNonFatalFailure.getReturnCode(returnCode)); + this.returnCode = returnCode; + this.failureBlockId = failureBlockId; + } + + @Override + protected Type type() { + return Type.PUSH_BLOCK_RETURN_CODE; + } + + @Override + public int hashCode() { + return Objects.hash(returnCode, failureBlockId); + } + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("returnCode", returnCode) + .append("failureBlockId", failureBlockId) + .toString(); + } + + @Override + public boolean equals(Object other) { + if (other != null && other instanceof BlockPushReturnCode) { + BlockPushReturnCode o = (BlockPushReturnCode) other; + return returnCode == o.returnCode && Objects.equals(failureBlockId, o.failureBlockId); + } + return false; + } + + @Override + public int encodedLength() { + return 1 + Encoders.Strings.encodedLength(failureBlockId); + } + + @Override + public void encode(ByteBuf buf) { + buf.writeByte(returnCode); + Encoders.Strings.encode(buf, failureBlockId); + } + + public static BlockPushReturnCode decode(ByteBuf buf) { + byte type = buf.readByte(); + String failureBlockId = Encoders.Strings.decode(buf); + return new BlockPushReturnCode(type, failureBlockId); + } +} diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java index 453791da7b..ad959c7e2e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java @@ -49,7 +49,8 @@ public abstract class BlockTransferMessage implements Encodable { HEARTBEAT(5), UPLOAD_BLOCK_STREAM(6), REMOVE_BLOCKS(7), BLOCKS_REMOVED(8), FETCH_SHUFFLE_BLOCKS(9), GET_LOCAL_DIRS_FOR_EXECUTORS(10), LOCAL_DIRS_FOR_EXECUTORS(11), PUSH_BLOCK_STREAM(12), FINALIZE_SHUFFLE_MERGE(13), MERGE_STATUSES(14), - FETCH_SHUFFLE_BLOCK_CHUNKS(15), DIAGNOSE_CORRUPTION(16), CORRUPTION_CAUSE(17); + FETCH_SHUFFLE_BLOCK_CHUNKS(15), DIAGNOSE_CORRUPTION(16), CORRUPTION_CAUSE(17), + PUSH_BLOCK_RETURN_CODE(18); private final byte id; @@ -86,6 +87,7 @@ public abstract class BlockTransferMessage implements Encodable { case 15: return FetchShuffleBlockChunks.decode(buf); case 16: return DiagnoseCorruption.decode(buf); case 17: return CorruptionCause.decode(buf); + case 18: return BlockPushReturnCode.decode(buf); default: throw new IllegalArgumentException("Unknown message type: " + type); } } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java index c8066d1e6b..56c9a97c64 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java @@ -21,6 +21,9 @@ import java.net.ConnectException; import org.junit.Test; +import org.apache.spark.network.server.BlockPushNonFatalFailure; +import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode; + import static org.junit.Assert.*; /** @@ -31,11 +34,13 @@ public class ErrorHandlerSuite { @Test public void testErrorRetry() { ErrorHandler.BlockPushErrorHandler pushHandler = new ErrorHandler.BlockPushErrorHandler(); - assertFalse(pushHandler.shouldRetryError(new RuntimeException(new IllegalArgumentException( - ErrorHandler.BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX)))); + assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure( + ReturnCode.TOO_LATE_BLOCK_PUSH, ""))); + assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure( + ReturnCode.STALE_BLOCK_PUSH, ""))); assertFalse(pushHandler.shouldRetryError(new RuntimeException(new ConnectException()))); - assertTrue(pushHandler.shouldRetryError(new RuntimeException(new IllegalArgumentException( - ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX)))); + assertTrue(pushHandler.shouldRetryError(new BlockPushNonFatalFailure( + ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, ""))); assertTrue(pushHandler.shouldRetryError(new Throwable())); ErrorHandler.BlockFetchErrorHandler fetchHandler = new ErrorHandler.BlockFetchErrorHandler(); @@ -46,10 +51,12 @@ public class ErrorHandlerSuite { @Test public void testErrorLogging() { ErrorHandler.BlockPushErrorHandler pushHandler = new ErrorHandler.BlockPushErrorHandler(); - assertFalse(pushHandler.shouldLogError(new RuntimeException(new IllegalArgumentException( - ErrorHandler.BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX)))); - assertFalse(pushHandler.shouldLogError(new RuntimeException(new IllegalArgumentException( - ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX)))); + assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure( + ReturnCode.TOO_LATE_BLOCK_PUSH, ""))); + assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure( + ReturnCode.STALE_BLOCK_PUSH, ""))); + assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure( + ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, ""))); assertTrue(pushHandler.shouldLogError(new Throwable())); ErrorHandler.BlockFetchErrorHandler fetchHandler = new ErrorHandler.BlockFetchErrorHandler(); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java index d2fd5d9be6..2aadb777be 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java @@ -36,7 +36,10 @@ import org.apache.spark.network.buffer.NettyManagedBuffer; import org.apache.spark.network.buffer.NioManagedBuffer; import org.apache.spark.network.client.RpcResponseCallback; import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.server.BlockPushNonFatalFailure; +import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode; import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; +import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode; import org.apache.spark.network.shuffle.protocol.PushBlockStream; @@ -140,15 +143,16 @@ public class OneForOneBlockPusherSuite { BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer(header); RpcResponseCallback callback = (RpcResponseCallback) invocation.getArguments()[2]; Map.Entry entry = blockIterator.next(); + String blockId = entry.getKey(); ManagedBuffer block = entry.getValue(); if (block != null && block.nioByteBuffer().capacity() > 0) { - callback.onSuccess(header); + callback.onSuccess(new BlockPushReturnCode(ReturnCode.SUCCESS.id(), "").toByteBuffer()); } else if (block != null) { - callback.onFailure(new RuntimeException("Failed " + entry.getKey() - + ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX)); + callback.onSuccess(new BlockPushReturnCode( + ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), blockId).toByteBuffer()); } else { - callback.onFailure(new RuntimeException("Quick fail " + entry.getKey() - + ErrorHandler.BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX)); + callback.onFailure(new BlockPushNonFatalFailure( + ReturnCode.TOO_LATE_BLOCK_PUSH, "")); } assertEquals(msgIterator.next(), message); return null; diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index 36f20a8386..10b6e3c29f 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -48,7 +48,10 @@ import static org.junit.Assert.*; import org.apache.spark.network.buffer.FileSegmentManagedBuffer; import org.apache.spark.network.client.StreamCallbackWithID; +import org.apache.spark.network.server.BlockPushNonFatalFailure; import org.apache.spark.network.shuffle.RemoteBlockPushResolver.MergeShuffleFile; +import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; import org.apache.spark.network.shuffle.protocol.MergeStatuses; @@ -103,6 +106,18 @@ public class RemoteBlockPushResolverSuite { } } + @Test + public void testErrorLogging() { + ErrorHandler.BlockPushErrorHandler errorHandler = RemoteBlockPushResolver.createErrorHandler(); + assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure( + BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH, ""))); + assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure( + BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH, ""))); + assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure( + BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, ""))); + assertTrue(errorHandler.shouldLogError(new Throwable())); + } + @Test(expected = RuntimeException.class) public void testNoIndexFile() { try { @@ -286,7 +301,7 @@ public class RemoteBlockPushResolverSuite { validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}}); } - @Test(expected = RuntimeException.class) + @Test(expected = BlockPushNonFatalFailure.class) public void testBlockReceivedAfterMergeFinalize() throws IOException { ByteBuffer[] blocks = new ByteBuffer[]{ ByteBuffer.wrap(new byte[4]), @@ -304,13 +319,15 @@ public class RemoteBlockPushResolverSuite { stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4])); try { stream1.onComplete(stream1.getID()); - } catch (RuntimeException re) { - assertEquals("Block shufflePush_0_0_1_0 received after merged shuffle is finalized or stale" - + " block push as shuffle blocks of a higher shuffleMergeId for the shuffle is being" - + " pushed", re.getMessage()); + } catch (BlockPushNonFatalFailure e) { + BlockPushReturnCode errorCode = + (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse()); + assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH.id(), + errorCode.returnCode); + assertEquals(errorCode.failureBlockId, stream1.getID()); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}}); - throw re; + throw e; } } @@ -348,7 +365,7 @@ public class RemoteBlockPushResolverSuite { assertArrayEquals(expectedBytes, mb.nioByteBuffer().array()); } - @Test(expected = RuntimeException.class) + @Test(expected = BlockPushNonFatalFailure.class) public void testCollision() throws IOException { StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( @@ -362,15 +379,17 @@ public class RemoteBlockPushResolverSuite { // Since stream2 didn't get any opportunity it will throw couldn't find opportunity error try { stream2.onComplete(stream2.getID()); - } catch (RuntimeException re) { - assertEquals( - "Couldn't find an opportunity to write block shufflePush_0_0_1_0 to merged shuffle", - re.getMessage()); - throw re; + } catch (BlockPushNonFatalFailure e) { + BlockPushReturnCode errorCode = + (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse()); + assertEquals(BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), + errorCode.returnCode); + assertEquals(errorCode.failureBlockId, stream2.getID()); + throw e; } } - @Test(expected = RuntimeException.class) + @Test(expected = BlockPushNonFatalFailure.class) public void testFailureInAStreamDoesNotInterfereWithStreamWhichIsWriting() throws IOException { StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( @@ -387,14 +406,16 @@ public class RemoteBlockPushResolverSuite { // This should be deferred stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[5])); // Since this stream didn't get any opportunity it will throw couldn't find opportunity error - RuntimeException failedEx = null; + BlockPushNonFatalFailure failedEx = null; try { stream3.onComplete(stream3.getID()); - } catch (RuntimeException re) { - assertEquals( - "Couldn't find an opportunity to write block shufflePush_0_0_2_0 to merged shuffle", - re.getMessage()); - failedEx = re; + } catch (BlockPushNonFatalFailure e) { + BlockPushReturnCode errorCode = + (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse()); + assertEquals(BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), + errorCode.returnCode); + assertEquals(errorCode.failureBlockId, stream3.getID()); + failedEx = e; } // stream 1 now completes stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); @@ -871,7 +892,7 @@ public class RemoteBlockPushResolverSuite { removeApplication(TEST_APP); } - @Test(expected = RuntimeException.class) + @Test(expected = BlockPushNonFatalFailure.class) public void testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws IOException { StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( @@ -895,14 +916,16 @@ public class RemoteBlockPushResolverSuite { new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 2, 0, 0)); // This should be deferred as stream 2 is still the active stream stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2])); - RuntimeException failedEx = null; + BlockPushNonFatalFailure failedEx = null; try { stream3.onComplete(stream3.getID()); - } catch (RuntimeException re) { - assertEquals( - "Couldn't find an opportunity to write block shufflePush_0_0_2_0 to merged shuffle", - re.getMessage()); - failedEx = re; + } catch (BlockPushNonFatalFailure e) { + BlockPushReturnCode errorCode = + (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse()); + assertEquals(BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), + errorCode.returnCode); + assertEquals(errorCode.failureBlockId, stream3.getID()); + failedEx = e; } // Stream 2 writes more and completes stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[4])); @@ -1072,10 +1095,12 @@ public class RemoteBlockPushResolverSuite { try { // stream 1 push should be rejected as it is from an older shuffleMergeId stream1.onComplete(stream1.getID()); - } catch(RuntimeException re) { - assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle is finalized or" - + " stale block push as shuffle blocks of a higher shuffleMergeId for the shuffle is being" - + " pushed", re.getMessage()); + } catch (BlockPushNonFatalFailure e) { + BlockPushReturnCode errorCode = + (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse()); + assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(), + errorCode.returnCode); + assertEquals(errorCode.failureBlockId, stream1.getID()); } // stream 2 now completes stream2.onComplete(stream2.getID()); @@ -1099,10 +1124,12 @@ public class RemoteBlockPushResolverSuite { try { // stream 1 push should be rejected as it is from an older shuffleMergeId stream1.onComplete(stream1.getID()); - } catch(RuntimeException re) { - assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle is finalized or" - + " stale block push as shuffle blocks of a higher shuffleMergeId for the shuffle is being" - + " pushed", re.getMessage()); + } catch (BlockPushNonFatalFailure e) { + BlockPushReturnCode errorCode = + (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse()); + assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(), + errorCode.returnCode); + assertEquals(errorCode.failureBlockId, stream1.getID()); } // stream 2 now completes stream2.onComplete(stream2.getID()); @@ -1153,10 +1180,12 @@ public class RemoteBlockPushResolverSuite { try { // stream 1 push should be rejected as it is from an older shuffleMergeId stream1.onComplete(stream1.getID()); - } catch(RuntimeException re) { - assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle is finalized or" - + " stale block push as shuffle blocks of a higher shuffleMergeId for the shuffle is being" - + " pushed", re.getMessage()); + } catch (BlockPushNonFatalFailure e) { + BlockPushReturnCode errorCode = + (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse()); + assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(), + errorCode.returnCode); + assertEquals(errorCode.failureBlockId, stream1.getID()); } // stream 2 now completes stream2.onComplete(stream2.getID()); diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala index ecaa4f0a2b..e6af76748b 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala @@ -24,8 +24,6 @@ import java.util.concurrent.ExecutorService import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} -import com.google.common.base.Throwables - import org.apache.spark.{ShuffleDependency, SparkConf, SparkEnv} import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging @@ -33,6 +31,8 @@ import org.apache.spark.internal.config._ import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.server.BlockPushNonFatalFailure +import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode import org.apache.spark.network.shuffle.BlockPushingListener import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler import org.apache.spark.network.util.TransportConf @@ -78,10 +78,12 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging { if (t.getCause != null && t.getCause.isInstanceOf[FileNotFoundException]) { return false } - val errorStackTraceString = Throwables.getStackTraceAsString(t) // If the block is too late or the invalid block push, there is no need to retry it - !errorStackTraceString.contains( - BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX) + !(t.isInstanceOf[BlockPushNonFatalFailure] && + (t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode + == ReturnCode.TOO_LATE_BLOCK_PUSH || + t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode + == ReturnCode.STALE_BLOCK_PUSH)) } } } diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala index 26cdad8f94..6f9b5e42fa 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala @@ -33,8 +33,9 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark._ import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.server.BlockPushNonFatalFailure +import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode import org.apache.spark.network.shuffle.{BlockPushingListener, BlockStoreClient} -import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler import org.apache.spark.network.util.TransportConf import org.apache.spark.serializer.JavaSerializer import org.apache.spark.shuffle.ShuffleBlockPusher.PushRequest @@ -219,13 +220,15 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach { val pusher = new ShuffleBlockPusher(conf) val errorHandler = pusher.createErrorHandler() assert( - !errorHandler.shouldRetryError(new RuntimeException( - new IllegalArgumentException( - BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX)))) + !errorHandler.shouldRetryError(new BlockPushNonFatalFailure( + ReturnCode.TOO_LATE_BLOCK_PUSH, ""))) + assert( + !errorHandler.shouldRetryError(new BlockPushNonFatalFailure( + ReturnCode.STALE_BLOCK_PUSH, ""))) assert(errorHandler.shouldRetryError(new RuntimeException(new ConnectException()))) assert( - errorHandler.shouldRetryError(new RuntimeException(new IllegalArgumentException( - BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX)))) + errorHandler.shouldRetryError(new BlockPushNonFatalFailure( + ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, ""))) assert (errorHandler.shouldRetryError(new Throwable())) } @@ -233,12 +236,13 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach { val pusher = new ShuffleBlockPusher(conf) val errorHandler = pusher.createErrorHandler() assert( - !errorHandler.shouldLogError(new RuntimeException( - new IllegalArgumentException( - BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX)))) - assert(!errorHandler.shouldLogError(new RuntimeException( - new IllegalArgumentException( - BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX)))) + !errorHandler.shouldLogError(new BlockPushNonFatalFailure( + ReturnCode.TOO_LATE_BLOCK_PUSH, ""))) + assert( + !errorHandler.shouldLogError(new BlockPushNonFatalFailure( + ReturnCode.STALE_BLOCK_PUSH, ""))) + assert(!errorHandler.shouldLogError(new BlockPushNonFatalFailure( + ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, ""))) assert(errorHandler.shouldLogError(new Throwable())) } @@ -255,9 +259,8 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach { if (failBlock) { failBlock = false // Fail the first block with the collision exception. - blockPushListener.onBlockPushFailure(blockId, new RuntimeException( - new IllegalArgumentException( - BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX))) + blockPushListener.onBlockPushFailure(blockId, new BlockPushNonFatalFailure( + ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")) } else { pushedBlocks += blockId blockPushListener.onBlockPushSuccess(blockId, mock(classOf[ManagedBuffer])) @@ -285,9 +288,8 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach { if (failBlock) { failBlock = false // Fail the first block with the too late exception. - blockPushListener.onBlockPushFailure(blockId, new RuntimeException( - new IllegalArgumentException( - BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX))) + blockPushListener.onBlockPushFailure(blockId, new BlockPushNonFatalFailure( + ReturnCode.TOO_LATE_BLOCK_PUSH, "")) } else { pushedBlocks += blockId blockPushListener.onBlockPushSuccess(blockId, mock(classOf[ManagedBuffer]))