diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index f051042a7a..8e7ecf500e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -419,4 +419,11 @@ public class TransportConf { public int ioExceptionsThresholdDuringMerge() { return conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4); } + + /** + * The application attemptID assigned from Hadoop YARN. + */ + public int appAttemptId() { + return conf.getInt("spark.app.attempt.id", -1); + } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index f44140b124..63bf787195 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -141,8 +141,8 @@ public class ExternalBlockStoreClient extends BlockStoreClient { RetryingBlockFetcher.BlockFetchStarter blockPushStarter = (inputBlockId, inputListener) -> { TransportClient client = clientFactory.createClient(host, port); - new OneForOneBlockPusher(client, appId, inputBlockId, inputListener, buffersWithId) - .start(); + new OneForOneBlockPusher(client, appId, conf.appAttemptId(), inputBlockId, + inputListener, buffersWithId).start(); }; int maxRetries = conf.maxIORetries(); if (maxRetries > 0) { @@ -168,7 +168,8 @@ public class ExternalBlockStoreClient extends BlockStoreClient { checkInit(); try { TransportClient client = clientFactory.createClient(host, port); - ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer(); + ByteBuffer finalizeShuffleMerge = + new FinalizeShuffleMerge(appId, conf.appAttemptId(), shuffleId).toByteBuffer(); client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java index 6ee95ef0de..b8b32e2755 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java @@ -45,6 +45,7 @@ public class OneForOneBlockPusher { private final TransportClient client; private final String appId; + private final int appAttemptId; private final String[] blockIds; private final BlockFetchingListener listener; private final Map buffers; @@ -52,11 +53,13 @@ public class OneForOneBlockPusher { public OneForOneBlockPusher( TransportClient client, String appId, + int appAttemptId, String[] blockIds, BlockFetchingListener listener, Map buffers) { this.client = client; this.appId = appId; + this.appAttemptId = appAttemptId; this.blockIds = blockIds; this.listener = listener; this.buffers = buffers; @@ -123,8 +126,9 @@ public class OneForOneBlockPusher { throw new IllegalArgumentException( "Unexpected shuffle push block id format: " + blockIds[i]); } - ByteBuffer header = new PushBlockStream(appId, Integer.parseInt(blockIdParts[1]), - Integer.parseInt(blockIdParts[2]), Integer.parseInt(blockIdParts[3]) , i).toByteBuffer(); + ByteBuffer header = + new PushBlockStream(appId, appAttemptId, Integer.parseInt(blockIdParts[1]), + Integer.parseInt(blockIdParts[2]), Integer.parseInt(blockIdParts[3]) , i).toByteBuffer(); client.uploadStream(new NioManagedBuffer(header), buffers.get(blockIds[i]), new BlockPushCallback(i, blockIds[i])); } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 47d25479d9..f88cfee105 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -28,27 +28,26 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.cache.Weigher; -import com.google.common.collect.Maps; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,14 +72,22 @@ import org.apache.spark.network.util.TransportConf; public class RemoteBlockPushResolver implements MergedShuffleFileManager { private static final Logger logger = LoggerFactory.getLogger(RemoteBlockPushResolver.class); - @VisibleForTesting - static final String MERGE_MANAGER_DIR = "merge_manager"; + public static final String MERGED_SHUFFLE_FILE_NAME_PREFIX = "shuffleMerged"; + public static final String SHUFFLE_META_DELIMITER = ":"; + public static final String MERGE_DIR_KEY = "mergeDir"; + public static final String ATTEMPT_ID_KEY = "attemptId"; + private static final int UNDEFINED_ATTEMPT_ID = -1; - private final ConcurrentMap appsPathInfo; - private final ConcurrentMap> partitions; + /** + * A concurrent hashmap where the key is the applicationId, and the value includes + * all the merged shuffle information for this application. AppShuffleInfo stores + * the application attemptId, merged shuffle local directories and the metadata + * for actively being merged shuffle partitions. + */ + private final ConcurrentMap appsShuffleInfo; - private final Executor directoryCleaner; + private final Executor mergedShuffleCleaner; private final TransportConf conf; private final int minChunkSize; private final int ioExceptionsThresholdDuringMerge; @@ -92,9 +99,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { @SuppressWarnings("UnstableApiUsage") public RemoteBlockPushResolver(TransportConf conf) { this.conf = conf; - this.partitions = Maps.newConcurrentMap(); - this.appsPathInfo = Maps.newConcurrentMap(); - this.directoryCleaner = Executors.newSingleThreadExecutor( + this.appsShuffleInfo = new ConcurrentHashMap<>(); + this.mergedShuffleCleaner = Executors.newSingleThreadExecutor( // Add `spark` prefix because it will run in NM in Yarn mode. NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner")); this.minChunkSize = conf.minChunkSizeInMergedShuffleFile(); @@ -112,34 +118,59 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { this.errorHandler = new ErrorHandler.BlockPushErrorHandler(); } + @VisibleForTesting + protected AppShuffleInfo validateAndGetAppShuffleInfo(String appId) { + // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart + AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(appId); + Preconditions.checkArgument(appShuffleInfo != null, + "application " + appId + " is not registered or NM was restarted."); + return appShuffleInfo; + } + /** - * Given the appShuffleId and reduceId that uniquely identifies a given shuffle partition of an - * application, retrieves the associated metadata. If not present and the corresponding merged - * shuffle does not exist, initializes the metadata. + * Given the appShuffleInfo, shuffleId and reduceId that uniquely identifies a given shuffle + * partition of an application, retrieves the associated metadata. If not present and the + * corresponding merged shuffle does not exist, initializes the metadata. */ private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo( - AppShuffleId appShuffleId, + AppShuffleInfo appShuffleInfo, + int shuffleId, int reduceId) { - File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId); - if (!partitions.containsKey(appShuffleId) && dataFile.exists()) { - // If this partition is already finalized then the partitions map will not contain - // the appShuffleId but the data file would exist. In that case the block is considered late. + File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, reduceId); + ConcurrentMap> partitions = + appShuffleInfo.partitions; + Map shufflePartitions = + partitions.compute(shuffleId, (id, map) -> { + if (map == null) { + // If this partition is already finalized then the partitions map will not contain the + // shuffleId but the data file would exist. In that case the block is considered late. + if (dataFile.exists()) { + return null; + } + return new ConcurrentHashMap<>(); + } else { + return map; + } + }); + if (shufflePartitions == null) { return null; } - Map shufflePartitions = - partitions.computeIfAbsent(appShuffleId, id -> Maps.newConcurrentMap()); + return shufflePartitions.computeIfAbsent(reduceId, key -> { // It only gets here when the key is not present in the map. This could either // be the first time the merge manager receives a pushed block for a given application // shuffle partition, or after the merged shuffle file is finalized. We handle these // two cases accordingly by checking if the file already exists. - File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId); - File metaFile = getMergedShuffleMetaFile(appShuffleId, reduceId); + File indexFile = + appShuffleInfo.getMergedShuffleIndexFile(shuffleId, reduceId); + File metaFile = + appShuffleInfo.getMergedShuffleMetaFile(shuffleId, reduceId); try { if (dataFile.exists()) { return null; } else { - return newAppShufflePartitionInfo(appShuffleId, reduceId, dataFile, indexFile, metaFile); + return newAppShufflePartitionInfo( + appShuffleInfo.appId, shuffleId, reduceId, dataFile, indexFile, metaFile); } } catch (IOException e) { logger.error( @@ -148,26 +179,28 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { indexFile.getAbsolutePath(), metaFile.getAbsolutePath()); throw new RuntimeException( String.format("Cannot initialize merged shuffle partition for appId %s shuffleId %s " - + "reduceId %s", appShuffleId.appId, appShuffleId.shuffleId, reduceId), e); + + "reduceId %s", appShuffleInfo.appId, shuffleId, reduceId), e); } }); } @VisibleForTesting AppShufflePartitionInfo newAppShufflePartitionInfo( - AppShuffleId appShuffleId, + String appId, + int shuffleId, int reduceId, File dataFile, File indexFile, File metaFile) throws IOException { - return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile, + return new AppShufflePartitionInfo(appId, shuffleId, reduceId, dataFile, new MergeShuffleFile(indexFile), new MergeShuffleFile(metaFile)); } @Override public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int reduceId) { - AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId); - File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + File indexFile = + appShuffleInfo.getMergedShuffleIndexFile(shuffleId, reduceId); if (!indexFile.exists()) { throw new RuntimeException(String.format( "Merged shuffle index file %s not found", indexFile.getPath())); @@ -175,7 +208,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { int size = (int) indexFile.length(); // First entry is the zero offset int numChunks = (size / Long.BYTES) - 1; - File metaFile = getMergedShuffleMetaFile(appShuffleId, reduceId); + File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId, reduceId); if (!metaFile.exists()) { throw new RuntimeException(String.format("Merged shuffle meta file %s not found", metaFile.getPath())); @@ -190,13 +223,14 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { @SuppressWarnings("UnstableApiUsage") @Override public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceId, int chunkId) { - AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId); - File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, reduceId); if (!dataFile.exists()) { throw new RuntimeException(String.format("Merged shuffle data file %s not found", dataFile.getPath())); } - File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId); + File indexFile = + appShuffleInfo.getMergedShuffleIndexFile(shuffleId, reduceId); try { // If we get here, the merged shuffle file should have been properly finalized. Thus we can // use the file length to determine the size of the merged shuffle block. @@ -210,76 +244,51 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } } - /** - * The logic here is consistent with - * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile( - * org.apache.spark.storage.BlockId, scala.Option)]] - */ - private File getFile(String appId, String filename) { - // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId), - "application " + appId + " is not registered or NM was restarted."); - File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs, - appPathsInfo.subDirsPerLocalDir, filename); - logger.debug("Get merged file {}", targetFile.getAbsolutePath()); - return targetFile; - } - - private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int reduceId) { - String fileName = String.format("%s.data", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, fileName); - } - - private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int reduceId) { - String indexName = String.format("%s.index", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, indexName); - } - - private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int reduceId) { - String metaName = String.format("%s.meta", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, metaName); - } - @Override public String[] getMergedBlockDirs(String appId) { - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId), - "application " + appId + " is not registered or NM was restarted."); - String[] activeLocalDirs = Preconditions.checkNotNull(appPathsInfo.activeLocalDirs, - "application " + appId - + " active local dirs list has not been updated by any executor registration"); - return activeLocalDirs; + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + return appShuffleInfo.appPathsInfo.activeLocalDirs; } @Override public void applicationRemoved(String appId, boolean cleanupLocalDirs) { logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs); - // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.remove(appId), - "application " + appId + " is not registered or NM was restarted."); - Iterator>> iterator = - partitions.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry> entry = iterator.next(); - AppShuffleId appShuffleId = entry.getKey(); - if (appId.equals(appShuffleId.appId)) { - iterator.remove(); - for (AppShufflePartitionInfo partitionInfo : entry.getValue().values()) { + AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId); + if (null != appShuffleInfo) { + mergedShuffleCleaner.execute( + () -> closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, cleanupLocalDirs)); + } + } + + + /** + * Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo. + * If cleanupLocalDirs is true, the merged shuffle files will also be deleted. + * The cleanup will be executed in a separate thread. + */ + @VisibleForTesting + void closeAndDeletePartitionFilesIfNeeded( + AppShuffleInfo appShuffleInfo, + boolean cleanupLocalDirs) { + for (Map partitionMap : appShuffleInfo.partitions.values()) { + for (AppShufflePartitionInfo partitionInfo : partitionMap.values()) { + synchronized (partitionInfo) { partitionInfo.closeAllFiles(); } } } if (cleanupLocalDirs) { - Path[] dirs = Arrays.stream(appPathsInfo.activeLocalDirs) - .map(dir -> Paths.get(dir)).toArray(Path[]::new); - directoryCleaner.execute(() -> deleteExecutorDirs(dirs)); + deleteExecutorDirs(appShuffleInfo); } } /** - * Serially delete local dirs, executed in a separate thread. + * Serially delete local dirs. */ @VisibleForTesting - void deleteExecutorDirs(Path[] dirs) { + void deleteExecutorDirs(AppShuffleInfo appShuffleInfo) { + Path[] dirs = Arrays.stream(appShuffleInfo.appPathsInfo.activeLocalDirs) + .map(dir -> Paths.get(dir)).toArray(Path[]::new); for (Path localDir : dirs) { try { if (Files.exists(localDir)) { @@ -294,10 +303,22 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { @Override public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) { + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId); + final String streamId = String.format("%s_%d_%d_%d", + OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, msg.shuffleId, msg.mapIndex, + msg.reduceId); + if (appShuffleInfo.attemptId != msg.appAttemptId) { + // If this Block belongs to a former application attempt, it is considered late, + // as only the blocks from the current application attempt will be merged + // TODO: [SPARK-35548] Client should be updated to handle this error. + throw new IllegalArgumentException( + String.format("The attempt id %s in this PushBlockStream message does not match " + + "with the current attempt id %s stored in shuffle service for application %s", + msg.appAttemptId, appShuffleInfo.attemptId, msg.appId)); + } // Retrieve merged shuffle file metadata - AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId); AppShufflePartitionInfo partitionInfoBeforeCheck = - getOrCreateAppShufflePartitionInfo(appShuffleId, msg.reduceId); + getOrCreateAppShufflePartitionInfo(appShuffleInfo, msg.shuffleId, msg.reduceId); // Here partitionInfo will be null in 2 cases: // 1) The request is received for a block that has already been merged, this is possible due // to the retry logic. @@ -338,11 +359,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { final AppShufflePartitionInfo partitionInfo = partitionInfoBeforeCheck != null && partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null : partitionInfoBeforeCheck; - final String streamId = String.format("%s_%d_%d_%d", - OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, appShuffleId.shuffleId, msg.mapIndex, - msg.reduceId); if (partitionInfo != null) { - return new PushBlockStreamCallback(this, streamId, partitionInfo, msg.mapIndex); + return new PushBlockStreamCallback( + this, appShuffleInfo, streamId, partitionInfo, msg.mapIndex); } else { // For a duplicate block or a block which is late, respond back with a callback that handles // them differently. @@ -377,24 +396,31 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } } - @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") @Override public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException { - logger.info("Finalizing shuffle {} from Application {}.", msg.shuffleId, msg.appId); - AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId); - Map shufflePartitions = partitions.get(appShuffleId); + logger.info("Finalizing shuffle {} from Application {}_{}.", + msg.shuffleId, msg.appId, msg.appAttemptId); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId); + if (appShuffleInfo.attemptId != msg.appAttemptId) { + // If this Block belongs to a former application attempt, it is considered late, + // as only the blocks from the current application attempt will be merged + // TODO: [SPARK-35548] Client should be updated to handle this error. + throw new IllegalArgumentException( + String.format("The attempt id %s in this FinalizeShuffleMerge message does not match " + + "with the current attempt id %s stored in shuffle service for application %s", + msg.appAttemptId, appShuffleInfo.attemptId, msg.appId)); + } + Map shufflePartitions = + appShuffleInfo.partitions.remove(msg.shuffleId); MergeStatuses mergeStatuses; if (shufflePartitions == null || shufflePartitions.isEmpty()) { mergeStatuses = new MergeStatuses(msg.shuffleId, new RoaringBitmap[0], new int[0], new long[0]); } else { - Collection partitionsToFinalize = shufflePartitions.values(); - List bitmaps = new ArrayList<>(partitionsToFinalize.size()); - List reduceIds = new ArrayList<>(partitionsToFinalize.size()); - List sizes = new ArrayList<>(partitionsToFinalize.size()); - Iterator partitionsIter = partitionsToFinalize.iterator(); - while (partitionsIter.hasNext()) { - AppShufflePartitionInfo partition = partitionsIter.next(); + List bitmaps = new ArrayList<>(shufflePartitions.size()); + List reduceIds = new ArrayList<>(shufflePartitions.size()); + List sizes = new ArrayList<>(shufflePartitions.size()); + for (AppShufflePartitionInfo partition: shufflePartitions.values()) { synchronized (partition) { try { // This can throw IOException which will marks this shuffle partition as not merged. @@ -403,13 +429,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { reduceIds.add(partition.reduceId); sizes.add(partition.getLastChunkOffset()); } catch (IOException ioe) { - logger.warn("Exception while finalizing shuffle partition {} {} {}", msg.appId, - msg.shuffleId, partition.reduceId, ioe); + logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId, + msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe); } finally { partition.closeAllFiles(); - // The partition should be removed after the files are written so that any new stream - // for the same reduce partition will see that the data file exists. - partitionsIter.remove(); } } } @@ -417,8 +440,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), Longs.toArray(sizes)); } - partitions.remove(appShuffleId); - logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, msg.appId); + logger.info("Finalized shuffle {} from Application {}_{}.", + msg.shuffleId, msg.appId, msg.appAttemptId); return mergeStatuses; } @@ -426,15 +449,68 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { if (logger.isDebugEnabled()) { logger.debug("register executor with RemoteBlockPushResolver {} local-dirs {} " - + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs), - executorInfo.subDirsPerLocalDir); + + "num sub-dirs {} shuffleManager {}", appId, Arrays.toString(executorInfo.localDirs), + executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager); + } + String shuffleManagerMeta = executorInfo.shuffleManager; + if (shuffleManagerMeta.contains(SHUFFLE_META_DELIMITER)) { + String mergeDirInfo = + shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(SHUFFLE_META_DELIMITER) + 1); + try { + ObjectMapper mapper = new ObjectMapper(); + TypeReference> typeRef + = new TypeReference>(){}; + Map metaMap = mapper.readValue(mergeDirInfo, typeRef); + String mergeDir = metaMap.get(MERGE_DIR_KEY); + int attemptId = Integer.valueOf( + metaMap.getOrDefault(ATTEMPT_ID_KEY, String.valueOf(UNDEFINED_ATTEMPT_ID))); + if (mergeDir == null) { + throw new IllegalArgumentException( + String.format("Failed to get the merge directory information from the " + + "shuffleManagerMeta %s in executor registration message", shuffleManagerMeta)); + } + if (attemptId == UNDEFINED_ATTEMPT_ID) { + // When attemptId is -1, there is no attemptId stored in the ExecutorShuffleInfo. + // Only the first ExecutorRegister message can register the merge dirs + appsShuffleInfo.computeIfAbsent(appId, id -> + new AppShuffleInfo( + appId, UNDEFINED_ATTEMPT_ID, + new AppPathsInfo(appId, executorInfo.localDirs, + mergeDir, executorInfo.subDirsPerLocalDir) + )); + } else { + // If attemptId is not -1, there is attemptId stored in the ExecutorShuffleInfo. + // The first ExecutorRegister message from the same application attempt wil register + // the merge dirs in External Shuffle Service. Any later ExecutorRegister message + // from the same application attempt will not override the merge dirs. But it can + // be overridden by ExecutorRegister message from newer application attempt, + // and former attempts' shuffle partitions information will also be cleaned up. + AtomicReference originalAppShuffleInfo = new AtomicReference<>(); + appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> { + if (appShuffleInfo == null || attemptId > appShuffleInfo.attemptId) { + originalAppShuffleInfo.set(appShuffleInfo); + appShuffleInfo = + new AppShuffleInfo( + appId, attemptId, + new AppPathsInfo(appId, executorInfo.localDirs, + mergeDir, executorInfo.subDirsPerLocalDir)); + } + return appShuffleInfo; + }); + if (originalAppShuffleInfo.get() != null) { + AppShuffleInfo appShuffleInfo = originalAppShuffleInfo.get(); + logger.warn("Cleanup shuffle info and merged shuffle files for {}_{} as new " + + "application attempt registered", appId, appShuffleInfo.attemptId); + mergedShuffleCleaner.execute( + () -> closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, true)); + } + } + } catch (JsonProcessingException e) { + logger.warn("Failed to get the merge directory information from ExecutorShuffleInfo: ", e); + } + } else { + logger.warn("ExecutorShuffleInfo does not have the expected merge directory information"); } - appsPathInfo.computeIfAbsent(appId, id -> new AppPathsInfo(appId, executorInfo.localDirs, - executorInfo.subDirsPerLocalDir)); - } - private static String generateFileName(AppShuffleId appShuffleId, int reduceId) { - return String.format("%s_%s_%d_%d", MERGED_SHUFFLE_FILE_NAME_PREFIX, appShuffleId.appId, - appShuffleId.shuffleId, reduceId); } /** @@ -443,6 +519,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { static class PushBlockStreamCallback implements StreamCallbackWithID { private final RemoteBlockPushResolver mergeManager; + private final AppShuffleInfo appShuffleInfo; private final String streamId; private final int mapIndex; private final AppShufflePartitionInfo partitionInfo; @@ -457,12 +534,17 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { private PushBlockStreamCallback( RemoteBlockPushResolver mergeManager, + AppShuffleInfo appShuffleInfo, String streamId, AppShufflePartitionInfo partitionInfo, int mapIndex) { - this.mergeManager = Preconditions.checkNotNull(mergeManager); + Preconditions.checkArgument(mergeManager != null); + this.mergeManager = mergeManager; + Preconditions.checkArgument(appShuffleInfo != null); + this.appShuffleInfo = appShuffleInfo; this.streamId = streamId; - this.partitionInfo = Preconditions.checkNotNull(partitionInfo); + Preconditions.checkArgument(partitionInfo != null); + this.partitionInfo = partitionInfo; this.mapIndex = mapIndex; abortIfNecessary(); } @@ -482,7 +564,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { while (buf.hasRemaining()) { long updatedPos = partitionInfo.getDataFilePos() + length; logger.debug("{} shuffleId {} reduceId {} current pos {} updated pos {}", - partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, + partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.reduceId, partitionInfo.getDataFilePos(), updatedPos); length += partitionInfo.dataChannel.write(buf, updatedPos); } @@ -567,7 +649,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // memory, while still providing the necessary guarantee. synchronized (partitionInfo) { Map shufflePartitions = - mergeManager.partitions.get(partitionInfo.appShuffleId); + appShuffleInfo.partitions.get(partitionInfo.shuffleId); // If the partitionInfo corresponding to (appId, shuffleId, reduceId) is no longer present // then it means that the shuffle merge has already been finalized. We should thus ignore // the data and just drain the remaining bytes of this message. This check should be @@ -587,7 +669,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } abortIfNecessary(); logger.trace("{} shuffleId {} reduceId {} onData writable", - partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, + partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.reduceId); if (partitionInfo.getCurrentMapIndex() < 0) { partitionInfo.setCurrentMapIndex(mapIndex); @@ -609,7 +691,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } } else { logger.trace("{} shuffleId {} reduceId {} onData deferred", - partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, + partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.reduceId); // If we cannot write to disk, we buffer the current block chunk in memory so it could // potentially be written to disk later. We take our best effort without guarantee @@ -644,10 +726,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { public void onComplete(String streamId) throws IOException { synchronized (partitionInfo) { logger.trace("{} shuffleId {} reduceId {} onComplete invoked", - partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, + partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.reduceId); Map shufflePartitions = - mergeManager.partitions.get(partitionInfo.appShuffleId); + appShuffleInfo.partitions.get(partitionInfo.shuffleId); // When this request initially got to the server, the shuffle merge finalize request // was not received yet. By the time we finish reading this message, the shuffle merge // however is already finalized. We should thus respond RpcFailure to the client. @@ -724,10 +806,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { if (isWriting) { synchronized (partitionInfo) { Map shufflePartitions = - mergeManager.partitions.get(partitionInfo.appShuffleId); + appShuffleInfo.partitions.get(partitionInfo.shuffleId); if (shufflePartitions != null && shufflePartitions.containsKey(partitionInfo.reduceId)) { logger.debug("{} shuffleId {} reduceId {} encountered failure", - partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, + partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.reduceId); partitionInfo.setCurrentMapIndex(-1); } @@ -742,63 +824,25 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } } - /** - * ID that uniquely identifies a shuffle for an application. This is used as a key in - * {@link #partitions}. - */ - public static class AppShuffleId { - public final String appId; - public final int shuffleId; - - AppShuffleId(String appId, int shuffleId) { - this.appId = appId; - this.shuffleId = shuffleId; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - AppShuffleId that = (AppShuffleId) o; - return shuffleId == that.shuffleId && Objects.equal(appId, that.appId); - } - - @Override - public int hashCode() { - return Objects.hashCode(appId, shuffleId); - } - - @Override - public String toString() { - return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) - .append("appId", appId) - .append("shuffleId", shuffleId) - .toString(); - } - } - /** Metadata tracked for an actively merged shuffle partition */ public static class AppShufflePartitionInfo { - private final AppShuffleId appShuffleId; + private final String appId; + private final int shuffleId; private final int reduceId; // The merged shuffle data file channel - public FileChannel dataChannel; + public final FileChannel dataChannel; + // The index file for a particular merged shuffle contains the chunk offsets. + private final MergeShuffleFile indexFile; + // The meta file for a particular merged shuffle contains all the map indices that belong to + // every chunk. The entry per chunk is a serialized bitmap. + private final MergeShuffleFile metaFile; // Location offset of the last successfully merged block for this shuffle partition private long dataFilePos; // Track the map index whose block is being merged for this shuffle partition private int currentMapIndex; // Bitmap tracking which mapper's blocks have been merged for this shuffle partition private RoaringBitmap mapTracker; - // The index file for a particular merged shuffle contains the chunk offsets. - private MergeShuffleFile indexFile; - // The meta file for a particular merged shuffle contains all the map indices that belong to - // every chunk. The entry per chunk is a serialized bitmap. - private MergeShuffleFile metaFile; // The offset for the last chunk tracked in the index file for this shuffle partition private long lastChunkOffset; private int lastMergedMapIndex = -1; @@ -808,12 +852,15 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { private boolean indexMetaUpdateFailed; AppShufflePartitionInfo( - AppShuffleId appShuffleId, + String appId, + int shuffleId, int reduceId, File dataFile, MergeShuffleFile indexFile, MergeShuffleFile metaFile) throws IOException { - this.appShuffleId = Preconditions.checkNotNull(appShuffleId, "app shuffle id"); + Preconditions.checkArgument(appId != null, "app id is null"); + this.appId = appId; + this.shuffleId = shuffleId; this.reduceId = reduceId; this.dataChannel = new FileOutputStream(dataFile).getChannel(); this.indexFile = indexFile; @@ -831,8 +878,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } public void setDataFilePos(long dataFilePos) { - logger.trace("{} shuffleId {} reduceId {} current pos {} update pos {}", appShuffleId.appId, - appShuffleId.shuffleId, reduceId, this.dataFilePos, dataFilePos); + logger.trace("{} shuffleId {} reduceId {} current pos {} update pos {}", appId, + shuffleId, reduceId, this.dataFilePos, dataFilePos); this.dataFilePos = dataFilePos; } @@ -842,7 +889,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { void setCurrentMapIndex(int mapIndex) { logger.trace("{} shuffleId {} reduceId {} updated mapIndex {} current mapIndex {}", - appShuffleId.appId, appShuffleId.shuffleId, reduceId, currentMapIndex, mapIndex); + appId, shuffleId, reduceId, currentMapIndex, mapIndex); this.currentMapIndex = mapIndex; } @@ -851,8 +898,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } void blockMerged(int mapIndex) { - logger.debug("{} shuffleId {} reduceId {} updated merging mapIndex {}", appShuffleId.appId, - appShuffleId.shuffleId, reduceId, mapIndex); + logger.debug("{} shuffleId {} reduceId {} updated merging mapIndex {}", appId, + shuffleId, reduceId, mapIndex); mapTracker.add(mapIndex); chunkTracker.add(mapIndex); lastMergedMapIndex = mapIndex; @@ -871,7 +918,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException { try { logger.trace("{} shuffleId {} reduceId {} index current {} updated {}", - appShuffleId.appId, appShuffleId.shuffleId, reduceId, this.lastChunkOffset, chunkOffset); + appId, shuffleId, reduceId, this.lastChunkOffset, chunkOffset); if (indexMetaUpdateFailed) { indexFile.getChannel().position(indexFile.getPos()); } @@ -885,8 +932,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { this.lastChunkOffset = chunkOffset; indexMetaUpdateFailed = false; } catch (IOException ioe) { - logger.warn("{} shuffleId {} reduceId {} update to index/meta failed", appShuffleId.appId, - appShuffleId.shuffleId, reduceId); + logger.warn("{} shuffleId {} reduceId {} update to index/meta failed", appId, + shuffleId, reduceId); indexMetaUpdateFailed = true; // Any exception here is propagated to the caller and the caller can decide whether to // abort or not. @@ -900,7 +947,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } chunkTracker.add(mapIndex); logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to meta file", - appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex); + appId, shuffleId, reduceId, mapIndex); if (indexMetaUpdateFailed) { metaFile.getChannel().position(metaFile.getPos()); } @@ -934,35 +981,25 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } void closeAllFiles() { - if (dataChannel != null) { - try { + try { + if (dataChannel.isOpen()) { dataChannel.close(); - } catch (IOException ioe) { - logger.warn("Error closing data channel for {} shuffleId {} reduceId {}", - appShuffleId.appId, appShuffleId.shuffleId, reduceId); - } finally { - dataChannel = null; } + } catch (IOException ioe) { + logger.warn("Error closing data channel for {} shuffleId {} reduceId {}", + appId, shuffleId, reduceId); } - if (metaFile != null) { - try { - metaFile.close(); - } catch (IOException ioe) { - logger.warn("Error closing meta file for {} shuffleId {} reduceId {}", - appShuffleId.appId, appShuffleId.shuffleId, reduceId); - } finally { - metaFile = null; - } + try { + metaFile.close(); + } catch (IOException ioe) { + logger.warn("Error closing meta file for {} shuffleId {} reduceId {}", + appId, shuffleId, reduceId); } - if (indexFile != null) { - try { - indexFile.close(); - } catch (IOException ioe) { - logger.warn("Error closing index file for {} shuffleId {} reduceId {}", - appShuffleId.appId, appShuffleId.shuffleId, reduceId); - } finally { - indexFile = null; - } + try { + indexFile.close(); + } catch (IOException ioe) { + logger.warn("Error closing index file for {} shuffleId {} reduceId {}", + appId, shuffleId, reduceId); } } @@ -1003,14 +1040,16 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { private AppPathsInfo( String appId, String[] localDirs, + String mergeDirectory, int subDirsPerLocalDir) { activeLocalDirs = Arrays.stream(localDirs) .map(localDir -> // Merge directory is created at the same level as block-manager directory. The list of - // local directories that we get from executorShuffleInfo are paths of each - // block-manager directory. To find out the merge directory location, we first find the - // parent dir and then append the "merger_manager" directory to it. - Paths.get(localDir).getParent().resolve(MERGE_MANAGER_DIR).toFile().getPath()) + // local directories that we get from ExecutorShuffleInfo are paths of each + // block-manager directory. The mergeDirectory is the merge directory name that we get + // from ExecutorShuffleInfo. To find out the merge directory location, we first find the + // parent dir of the block-manager directory and then append merge directory name to it. + Paths.get(localDir).getParent().resolve(mergeDirectory).toFile().getPath()) .toArray(String[]::new); this.subDirsPerLocalDir = subDirsPerLocalDir; if (logger.isInfoEnabled()) { @@ -1020,10 +1059,76 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } } + /** Merged Shuffle related information tracked for a specific application attempt */ + public static class AppShuffleInfo { + + private final String appId; + private final int attemptId; + private final AppPathsInfo appPathsInfo; + private final ConcurrentMap> partitions; + + AppShuffleInfo( + String appId, + int attemptId, + AppPathsInfo appPathsInfo) { + this.appId = appId; + this.attemptId = attemptId; + this.appPathsInfo = appPathsInfo; + partitions = new ConcurrentHashMap<>(); + } + + @VisibleForTesting + public ConcurrentMap> getPartitions() { + return partitions; + } + + /** + * The logic here is consistent with + * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile( + * org.apache.spark.storage.BlockId, scala.Option)]] + */ + private File getFile(String filename) { + // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart + File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs, + appPathsInfo.subDirsPerLocalDir, filename); + logger.debug("Get merged file {}", targetFile.getAbsolutePath()); + return targetFile; + } + + private String generateFileName( + String appId, + int shuffleId, + int reduceId) { + return String.format( + "%s_%s_%d_%d", MERGED_SHUFFLE_FILE_NAME_PREFIX, appId, shuffleId, reduceId); + } + + public File getMergedShuffleDataFile( + int shuffleId, + int reduceId) { + String fileName = String.format("%s.data", generateFileName(appId, shuffleId, reduceId)); + return getFile(fileName); + } + + public File getMergedShuffleIndexFile( + int shuffleId, + int reduceId) { + String indexName = String.format("%s.index", generateFileName(appId, shuffleId, reduceId)); + return getFile(indexName); + } + + public File getMergedShuffleMetaFile( + int shuffleId, + int reduceId) { + String metaName = String.format("%s.meta", generateFileName(appId, shuffleId, reduceId)); + return getFile(metaName); + } + } + @VisibleForTesting static class MergeShuffleFile { - private FileChannel channel; - private DataOutputStream dos; + private final FileChannel channel; + private final DataOutputStream dos; private long pos; @VisibleForTesting @@ -1044,11 +1149,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } void close() throws IOException { - try { + if (channel.isOpen()) { dos.close(); - } finally { - dos = null; - channel = null; } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java index b4e7bc409d..f123ccb663 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java @@ -35,7 +35,12 @@ public class ExecutorShuffleInfo implements Encodable { public final String[] localDirs; /** Number of subdirectories created within each localDir. */ public final int subDirsPerLocalDir; - /** Shuffle manager (SortShuffleManager) that the executor is using. */ + /** + * Shuffle manager (SortShuffleManager) that the executor is using. + * If this string contains semicolon, it will also include the meta information + * for push based shuffle in JSON format. Example of the string with semicolon would be: + * SortShuffleManager:{"mergeDir": "mergeDirectory_1", "attemptId": 1} + */ public final String shuffleManager; @JsonCreator diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java index 31efbb727b..f6ab78b1ab 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java @@ -32,12 +32,15 @@ import org.apache.spark.network.protocol.Encoders; */ public class FinalizeShuffleMerge extends BlockTransferMessage { public final String appId; + public final int appAttemptId; public final int shuffleId; public FinalizeShuffleMerge( String appId, + int appAttemptId, int shuffleId) { this.appId = appId; + this.appAttemptId = appAttemptId; this.shuffleId = shuffleId; } @@ -48,13 +51,14 @@ public class FinalizeShuffleMerge extends BlockTransferMessage { @Override public int hashCode() { - return Objects.hashCode(appId, shuffleId); + return Objects.hashCode(appId, appAttemptId, shuffleId); } @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("appId", appId) + .append("attemptId", appAttemptId) .append("shuffleId", shuffleId) .toString(); } @@ -64,6 +68,7 @@ public class FinalizeShuffleMerge extends BlockTransferMessage { if (other != null && other instanceof FinalizeShuffleMerge) { FinalizeShuffleMerge o = (FinalizeShuffleMerge) other; return Objects.equal(appId, o.appId) + && appAttemptId == appAttemptId && shuffleId == o.shuffleId; } return false; @@ -71,18 +76,20 @@ public class FinalizeShuffleMerge extends BlockTransferMessage { @Override public int encodedLength() { - return Encoders.Strings.encodedLength(appId) + 4; + return Encoders.Strings.encodedLength(appId) + 4 + 4; } @Override public void encode(ByteBuf buf) { Encoders.Strings.encode(buf, appId); + buf.writeInt(appAttemptId); buf.writeInt(shuffleId); } public static FinalizeShuffleMerge decode(ByteBuf buf) { String appId = Encoders.Strings.decode(buf); + int attemptId = buf.readInt(); int shuffleId = buf.readInt(); - return new FinalizeShuffleMerge(appId, shuffleId); + return new FinalizeShuffleMerge(appId, attemptId, shuffleId); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java index 559f88fc4e..d5e1cf2464 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java @@ -19,6 +19,7 @@ package org.apache.spark.network.shuffle.protocol; import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; + import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -34,6 +35,7 @@ import org.apache.spark.network.protocol.Encoders; */ public class PushBlockStream extends BlockTransferMessage { public final String appId; + public final int appAttemptId; public final int shuffleId; public final int mapIndex; public final int reduceId; @@ -41,8 +43,15 @@ public class PushBlockStream extends BlockTransferMessage { // blocks to be pushed. public final int index; - public PushBlockStream(String appId, int shuffleId, int mapIndex, int reduceId, int index) { + public PushBlockStream( + String appId, + int appAttemptId, + int shuffleId, + int mapIndex, + int reduceId, + int index) { this.appId = appId; + this.appAttemptId = appAttemptId; this.shuffleId = shuffleId; this.mapIndex = mapIndex; this.reduceId = reduceId; @@ -56,13 +65,14 @@ public class PushBlockStream extends BlockTransferMessage { @Override public int hashCode() { - return Objects.hashCode(appId, shuffleId, mapIndex , reduceId, index); + return Objects.hashCode(appId, appAttemptId, shuffleId, mapIndex , reduceId, index); } @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("appId", appId) + .append("attemptId", appAttemptId) .append("shuffleId", shuffleId) .append("mapIndex", mapIndex) .append("reduceId", reduceId) @@ -75,6 +85,7 @@ public class PushBlockStream extends BlockTransferMessage { if (other != null && other instanceof PushBlockStream) { PushBlockStream o = (PushBlockStream) other; return Objects.equal(appId, o.appId) + && appAttemptId == o.appAttemptId && shuffleId == o.shuffleId && mapIndex == o.mapIndex && reduceId == o.reduceId @@ -85,12 +96,13 @@ public class PushBlockStream extends BlockTransferMessage { @Override public int encodedLength() { - return Encoders.Strings.encodedLength(appId) + 16; + return Encoders.Strings.encodedLength(appId) + 4 + 4 + 4 + 4 + 4; } @Override public void encode(ByteBuf buf) { Encoders.Strings.encode(buf, appId); + buf.writeInt(appAttemptId); buf.writeInt(shuffleId); buf.writeInt(mapIndex); buf.writeInt(reduceId); @@ -99,10 +111,11 @@ public class PushBlockStream extends BlockTransferMessage { public static PushBlockStream decode(ByteBuf buf) { String appId = Encoders.Strings.decode(buf); + int attemptId = buf.readInt(); int shuffleId = buf.readInt(); int mapIdx = buf.readInt(); int reduceId = buf.readInt(); int index = buf.readInt(); - return new PushBlockStream(appId, shuffleId, mapIdx, reduceId, index); + return new PushBlockStream(appId, attemptId, shuffleId, mapIdx, reduceId, index); } } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java index dc41e957f0..00756b1b62 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java @@ -243,7 +243,7 @@ public class ExternalBlockHandlerSuite { public void testFinalizeShuffleMerge() throws IOException { RpcResponseCallback callback = mock(RpcResponseCallback.class); - FinalizeShuffleMerge req = new FinalizeShuffleMerge("app0", 0); + FinalizeShuffleMerge req = new FinalizeShuffleMerge("app0", 1, 0); RoaringBitmap bitmap = RoaringBitmap.bitmapOf(0, 1, 2); MergeStatuses statuses = new MergeStatuses(0, new RoaringBitmap[]{bitmap}, new int[]{3}, new long[]{30}); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java index 46a0f6cf42..e41198f8ae 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java @@ -51,7 +51,7 @@ public class OneForOneBlockPusherSuite { BlockFetchingListener listener = pushBlocks( blocks, blockIds, - Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0))); + Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0))); verify(listener).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any()); } @@ -67,9 +67,9 @@ public class OneForOneBlockPusherSuite { BlockFetchingListener listener = pushBlocks( blocks, blockIds, - Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0), - new PushBlockStream("app-id", 0, 1, 0, 1), - new PushBlockStream("app-id", 0, 2, 0, 2))); + Arrays.asList(new PushBlockStream("app-id",0, 0, 0, 0, 0), + new PushBlockStream("app-id", 0, 0, 1, 0, 1), + new PushBlockStream("app-id", 0, 0, 2, 0, 2))); verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any()); verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_1_0"), any()); @@ -87,9 +87,9 @@ public class OneForOneBlockPusherSuite { BlockFetchingListener listener = pushBlocks( blocks, blockIds, - Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0), - new PushBlockStream("app-id", 0, 1, 0, 1), - new PushBlockStream("app-id", 0, 2, 0, 2))); + Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0), + new PushBlockStream("app-id", 0, 0, 1, 0, 1), + new PushBlockStream("app-id", 0, 0, 2, 0, 2))); verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any()); verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_1_0"), any()); @@ -107,9 +107,9 @@ public class OneForOneBlockPusherSuite { BlockFetchingListener listener = pushBlocks( blocks, blockIds, - Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0), - new PushBlockStream("app-id", 0, 1, 0, 1), - new PushBlockStream("app-id", 0, 2, 0, 2))); + Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0), + new PushBlockStream("app-id", 0, 0, 1, 0, 1), + new PushBlockStream("app-id", 0, 0, 2, 0, 2))); verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any()); verify(listener, times(0)).onBlockFetchSuccess(not(eq("shufflePush_0_0_0")), any()); @@ -130,7 +130,7 @@ public class OneForOneBlockPusherSuite { TransportClient client = mock(TransportClient.class); BlockFetchingListener listener = mock(BlockFetchingListener.class); OneForOneBlockPusher pusher = - new OneForOneBlockPusher(client, "app-id", blockIds, listener, blocks); + new OneForOneBlockPusher(client, "app-id", 0, blockIds, listener, blocks); Iterator> blockIterator = blocks.entrySet().iterator(); Iterator msgIterator = expectMessages.iterator(); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index 565d433ff3..2a73aa56b2 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -22,11 +22,13 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; @@ -61,6 +63,17 @@ public class RemoteBlockPushResolverSuite { private static final Logger log = LoggerFactory.getLogger(RemoteBlockPushResolverSuite.class); private final String TEST_APP = "testApp"; + private final String MERGE_DIRECTORY = "merge_manager"; + private final int NO_ATTEMPT_ID = -1; + private final int ATTEMPT_ID_1 = 1; + private final int ATTEMPT_ID_2 = 2; + private final String MERGE_DIRECTORY_META = "shuffleManager:{\"mergeDir\": \"merge_manager\"}"; + private final String MERGE_DIRECTORY_META_1 = + "shuffleManager:{\"mergeDir\": \"merge_manager_1\", \"attemptId\": \"1\"}"; + private final String MERGE_DIRECTORY_META_2 = + "shuffleManager:{\"mergeDir\": \"merge_manager_2\", \"attemptId\": \"2\"}"; + private final String INVALID_MERGE_DIRECTORY_META = + "shuffleManager:{\"mergeDirInvalid\": \"merge_manager_2\", \"attemptId\": \"2\"}"; private final String BLOCK_MANAGER_DIR = "blockmgr-193d8401"; private TransportConf conf; @@ -74,7 +87,7 @@ public class RemoteBlockPushResolverSuite { ImmutableMap.of("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "4")); conf = new TransportConf("shuffle", provider); pushResolver = new RemoteBlockPushResolver(conf); - registerExecutor(TEST_APP, prepareLocalDirs(localDirs)); + registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META); } @After @@ -106,9 +119,9 @@ public class RemoteBlockPushResolverSuite { new PushBlock(0, 0, 0, ByteBuffer.wrap(new byte[4])), new PushBlock(0, 1, 0, ByteBuffer.wrap(new byte[5])) }; - pushBlockHelper(TEST_APP, pushBlocks); + pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks); MergeStatuses statuses = pushResolver.finalizeShuffleMerge( - new FinalizeShuffleMerge(TEST_APP, 0)); + new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); validateMergeStatuses(statuses, new int[] {0}, new long[] {9}); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4, 5}, new int[][]{{0}, {1}}); @@ -122,9 +135,9 @@ public class RemoteBlockPushResolverSuite { new PushBlock(0, 2, 0, ByteBuffer.wrap(new byte[5])), new PushBlock(0, 3, 0, ByteBuffer.wrap(new byte[3])) }; - pushBlockHelper(TEST_APP, pushBlocks); + pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks); MergeStatuses statuses = pushResolver.finalizeShuffleMerge( - new FinalizeShuffleMerge(TEST_APP, 0)); + new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); validateMergeStatuses(statuses, new int[] {0}, new long[] {13}); MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, meta, new int[]{5, 5, 3}, new int[][]{{0, 1}, {2}, {3}}); @@ -138,9 +151,9 @@ public class RemoteBlockPushResolverSuite { new PushBlock(0, 0, 1, ByteBuffer.wrap(new byte[5])), new PushBlock(0, 1, 1, ByteBuffer.wrap(new byte[3])) }; - pushBlockHelper(TEST_APP, pushBlocks); + pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks); MergeStatuses statuses = pushResolver.finalizeShuffleMerge( - new FinalizeShuffleMerge(TEST_APP, 0)); + new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); validateMergeStatuses(statuses, new int[] {0, 1}, new long[] {5, 8}); MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, meta, new int[]{5}, new int[][]{{0, 1}}); @@ -149,10 +162,12 @@ public class RemoteBlockPushResolverSuite { @Test public void testDeferredBufsAreWrittenDuringOnData() throws IOException { StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); // This should be deferred stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3])); // stream 1 now completes @@ -161,7 +176,7 @@ public class RemoteBlockPushResolverSuite { // stream 2 has more data and then completes stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3])); stream2.onComplete(stream2.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4, 6}, new int[][]{{0}, {1}}); } @@ -169,10 +184,12 @@ public class RemoteBlockPushResolverSuite { @Test public void testDeferredBufsAreWrittenDuringOnComplete() throws IOException { StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); // This should be deferred stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3])); stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3])); @@ -181,7 +198,7 @@ public class RemoteBlockPushResolverSuite { stream1.onComplete(stream1.getID()); // stream 2 now completes completes stream2.onComplete(stream2.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4, 6}, new int[][]{{0}, {1}}); } @@ -189,17 +206,19 @@ public class RemoteBlockPushResolverSuite { @Test public void testDuplicateBlocksAreIgnoredWhenPrevStreamHasCompleted() throws IOException { StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); stream1.onComplete(stream1.getID()); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); // This should be ignored stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2])); stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2])); stream2.onComplete(stream2.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}}); } @@ -207,10 +226,12 @@ public class RemoteBlockPushResolverSuite { @Test public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOException { StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); // This should be ignored stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2])); stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2])); @@ -219,7 +240,7 @@ public class RemoteBlockPushResolverSuite { stream1.onComplete(stream1.getID()); // stream 2 now completes completes stream2.onComplete(stream2.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}}); } @@ -227,10 +248,11 @@ public class RemoteBlockPushResolverSuite { @Test public void testFailureAfterData() throws IOException { StreamCallbackWithID stream = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4])); stream.onFailure(stream.getID(), new RuntimeException("Forced Failure")); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); assertEquals("num-chunks", 0, blockMeta.getNumChunks()); } @@ -238,12 +260,13 @@ public class RemoteBlockPushResolverSuite { @Test public void testFailureAfterMultipleDataBlocks() throws IOException { StreamCallbackWithID stream = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[2])); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[3])); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4])); stream.onFailure(stream.getID(), new RuntimeException("Forced Failure")); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); assertEquals("num-chunks", 0, blockMeta.getNumChunks()); } @@ -251,39 +274,39 @@ public class RemoteBlockPushResolverSuite { @Test public void testFailureAfterComplete() throws IOException { StreamCallbackWithID stream = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[2])); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[3])); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4])); stream.onComplete(stream.getID()); stream.onFailure(stream.getID(), new RuntimeException("Forced Failure")); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}}); } - @Test (expected = RuntimeException.class) - public void testTooLateArrival() throws IOException { + @Test(expected = RuntimeException.class) + public void testBlockReceivedAfterMergeFinalize() throws IOException { ByteBuffer[] blocks = new ByteBuffer[]{ ByteBuffer.wrap(new byte[4]), ByteBuffer.wrap(new byte[5]) }; StreamCallbackWithID stream = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); for (ByteBuffer block : blocks) { stream.onData(stream.getID(), block); } stream.onComplete(stream.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4])); try { stream1.onComplete(stream1.getID()); } catch (RuntimeException re) { assertEquals( - "Block shufflePush_0_1_0 received after merged shuffle is finalized", - re.getMessage()); + "Block shufflePush_0_1_0 received after merged shuffle is finalized", re.getMessage()); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}}); throw re; @@ -292,28 +315,31 @@ public class RemoteBlockPushResolverSuite { @Test public void testIncompleteStreamsAreOverwritten() throws IOException { - registerExecutor(TEST_APP, prepareLocalDirs(localDirs)); + registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META); byte[] expectedBytes = new byte[4]; ThreadLocalRandom.current().nextBytes(expectedBytes); StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); byte[] data = new byte[10]; ThreadLocalRandom.current().nextBytes(data); stream1.onData(stream1.getID(), ByteBuffer.wrap(data)); // There is a failure stream1.onFailure(stream1.getID(), new RuntimeException("forced error")); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); ByteBuffer nextBuf= ByteBuffer.wrap(expectedBytes, 0, 2); stream2.onData(stream2.getID(), nextBuf); stream2.onComplete(stream2.getID()); StreamCallbackWithID stream3 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0)); nextBuf = ByteBuffer.wrap(expectedBytes, 2, 2); stream3.onData(stream3.getID(), nextBuf); stream3.onComplete(stream3.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{1, 2}}); FileSegmentManagedBuffer mb = @@ -321,13 +347,15 @@ public class RemoteBlockPushResolverSuite { assertArrayEquals(expectedBytes, mb.nioByteBuffer().array()); } - @Test (expected = RuntimeException.class) + @Test(expected = RuntimeException.class) public void testCollision() throws IOException { StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); // This should be deferred stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5])); // Since stream2 didn't get any opportunity it will throw couldn't find opportunity error @@ -341,17 +369,20 @@ public class RemoteBlockPushResolverSuite { } } - @Test (expected = RuntimeException.class) + @Test(expected = RuntimeException.class) public void testFailureInAStreamDoesNotInterfereWithStreamWhichIsWriting() throws IOException { StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); // There is a failure with stream2 stream2.onFailure(stream2.getID(), new RuntimeException("forced error")); StreamCallbackWithID stream3 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0)); // This should be deferred stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[5])); // Since this stream didn't get any opportunity it will throw couldn't find opportunity error @@ -368,7 +399,7 @@ public class RemoteBlockPushResolverSuite { stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); stream1.onComplete(stream1.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4}, new int[][] {{0}}); if (failedEx != null) { @@ -376,28 +407,83 @@ public class RemoteBlockPushResolverSuite { } } - @Test(expected = NullPointerException.class) + @Test(expected = IllegalArgumentException.class) public void testUpdateLocalDirsOnlyOnce() throws IOException { String testApp = "updateLocalDirsOnlyOnceTest"; Path[] activeLocalDirs = createLocalDirs(1); - registerExecutor(testApp, prepareLocalDirs(activeLocalDirs)); + registerExecutor(testApp, prepareLocalDirs(activeLocalDirs, MERGE_DIRECTORY), + MERGE_DIRECTORY_META); assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1); assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains( activeLocalDirs[0].toFile().getPath())); - // Any later executor register from the same application should not change the active local - // dirs list + // Any later executor register from the same application attempt should not change the active + // local dirs list Path[] updatedLocalDirs = localDirs; - registerExecutor(testApp, prepareLocalDirs(updatedLocalDirs)); + registerExecutor(testApp, prepareLocalDirs(updatedLocalDirs, MERGE_DIRECTORY), + MERGE_DIRECTORY_META); assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1); assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains( activeLocalDirs[0].toFile().getPath())); removeApplication(testApp); try { pushResolver.getMergedBlockDirs(testApp); - } catch (Throwable e) { - assertTrue(e.getMessage() - .startsWith("application " + testApp + " is not registered or NM was restarted.")); - Throwables.propagate(e); + } catch (IllegalArgumentException e) { + assertEquals(e.getMessage(), + "application " + testApp + " is not registered or NM was restarted."); + throw e; + } + } + + @Test(expected = IllegalArgumentException.class) + public void testExecutorRegisterWithInvalidJsonForPushShuffle() throws IOException { + String testApp = "executorRegisterWithInvalidShuffleManagerMeta"; + Path[] activeLocalDirs = createLocalDirs(1); + try { + registerExecutor(testApp, prepareLocalDirs(activeLocalDirs, MERGE_DIRECTORY), + INVALID_MERGE_DIRECTORY_META); + } catch (IllegalArgumentException re) { + assertEquals( + "Failed to get the merge directory information from the shuffleManagerMeta " + + "shuffleManager:{\"mergeDirInvalid\": \"merge_manager_2\", \"attemptId\": \"2\"} in " + + "executor registration message", re.getMessage()); + throw re; + } + } + + @Test(expected = IllegalArgumentException.class) + public void testExecutorRegistrationFromTwoAppAttempts() throws IOException { + String testApp = "testExecutorRegistrationFromTwoAppAttempts"; + Path[] attempt1LocalDirs = createLocalDirs(1); + registerExecutor(testApp, + prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1), + MERGE_DIRECTORY_META_1); + assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1); + assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains( + attempt1LocalDirs[0].toFile().getPath())); + // Any later executor register from the same application attempt should not change the active + // local dirs list + Path[] attempt1UpdatedLocalDirs = localDirs; + registerExecutor(testApp, + prepareLocalDirs(attempt1UpdatedLocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1), + MERGE_DIRECTORY_META_1); + assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1); + assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains( + attempt1LocalDirs[0].toFile().getPath())); + // But a new attempt from the same application can change the active local dirs list + Path[] attempt2LocalDirs = createLocalDirs(2); + registerExecutor(testApp, + prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2), + MERGE_DIRECTORY_META_2); + assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 2); + assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains( + attempt2LocalDirs[0].toFile().getPath())); + removeApplication(testApp); + try { + pushResolver.getMergedBlockDirs(testApp); + } catch (IllegalArgumentException e) { + assertEquals(e.getMessage(), + "application " + testApp + " is not registered or NM was restarted."); + throw e; } } @@ -407,17 +493,18 @@ public class RemoteBlockPushResolverSuite { Semaphore deleted = new Semaphore(0); pushResolver = new RemoteBlockPushResolver(conf) { @Override - void deleteExecutorDirs(Path[] dirs) { - super.deleteExecutorDirs(dirs); + void deleteExecutorDirs(AppShuffleInfo appShuffleInfo) { + super.deleteExecutorDirs(appShuffleInfo); deleted.release(); } }; + Path[] activeDirs = createLocalDirs(1); - registerExecutor(testApp, prepareLocalDirs(activeDirs)); + registerExecutor(testApp, prepareLocalDirs(activeDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META); PushBlock[] pushBlocks = new PushBlock[] { new PushBlock(0, 0, 0, ByteBuffer.wrap(new byte[4]))}; - pushBlockHelper(testApp, pushBlocks); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, 0)); + pushBlockHelper(testApp, NO_ATTEMPT_ID, pushBlocks); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(testApp, 0, 0); validateChunks(testApp, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}}); String[] mergeDirs = pushResolver.getMergedBlockDirs(testApp); @@ -435,7 +522,7 @@ public class RemoteBlockPushResolverSuite { useTestFiles(true, false); RemoteBlockPushResolver.PushBlockStreamCallback callback1 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4])); callback1.onComplete(callback1.getID()); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo(); @@ -443,7 +530,7 @@ public class RemoteBlockPushResolverSuite { TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); testIndexFile.close(); StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); // This will complete without any IOExceptions because number of IOExceptions are less than // the threshold but the update to index file will be unsuccessful. @@ -452,12 +539,12 @@ public class RemoteBlockPushResolverSuite { // Restore the index stream so it can write successfully again. testIndexFile.restore(); StreamCallbackWithID callback3 = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0)); callback3.onData(callback3.getID(), ByteBuffer.wrap(new byte[2])); callback3.onComplete(callback3.getID()); assertEquals("index position", 24, testIndexFile.getPos()); MergeStatuses statuses = pushResolver.finalizeShuffleMerge( - new FinalizeShuffleMerge(TEST_APP, 0)); + new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); validateMergeStatuses(statuses, new int[] {0}, new long[] {11}); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 7}, new int[][] {{0}, {1, 2}}); @@ -468,7 +555,7 @@ public class RemoteBlockPushResolverSuite { useTestFiles(true, false); RemoteBlockPushResolver.PushBlockStreamCallback callback1 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4])); callback1.onComplete(callback1.getID()); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo(); @@ -476,7 +563,7 @@ public class RemoteBlockPushResolverSuite { TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); testIndexFile.close(); StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); // This will complete without any IOExceptions because number of IOExceptions are less than // the threshold but the update to index file will be unsuccessful. @@ -486,7 +573,7 @@ public class RemoteBlockPushResolverSuite { // Restore the index stream so it can write successfully again. testIndexFile.restore(); MergeStatuses statuses = pushResolver.finalizeShuffleMerge( - new FinalizeShuffleMerge(TEST_APP, 0)); + new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); assertEquals("index position", 24, testIndexFile.getPos()); validateMergeStatuses(statuses, new int[] {0}, new long[] {9}); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); @@ -498,7 +585,7 @@ public class RemoteBlockPushResolverSuite { useTestFiles(false, true); RemoteBlockPushResolver.PushBlockStreamCallback callback1 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4])); callback1.onComplete(callback1.getID()); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo(); @@ -507,7 +594,7 @@ public class RemoteBlockPushResolverSuite { long metaPosBeforeClose = testMetaFile.getPos(); testMetaFile.close(); StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); // This will complete without any IOExceptions because number of IOExceptions are less than // the threshold but the update to index and meta file will be unsuccessful. @@ -517,13 +604,13 @@ public class RemoteBlockPushResolverSuite { // Restore the meta stream so it can write successfully again. testMetaFile.restore(); StreamCallbackWithID callback3 = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0)); callback3.onData(callback3.getID(), ByteBuffer.wrap(new byte[2])); callback3.onComplete(callback3.getID()); assertEquals("index position", 24, partitionInfo.getIndexFile().getPos()); assertTrue("meta position", testMetaFile.getPos() > metaPosBeforeClose); MergeStatuses statuses = pushResolver.finalizeShuffleMerge( - new FinalizeShuffleMerge(TEST_APP, 0)); + new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); validateMergeStatuses(statuses, new int[] {0}, new long[] {11}); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 7}, new int[][] {{0}, {1, 2}}); @@ -534,7 +621,7 @@ public class RemoteBlockPushResolverSuite { useTestFiles(false, true); RemoteBlockPushResolver.PushBlockStreamCallback callback1 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4])); callback1.onComplete(callback1.getID()); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo(); @@ -543,7 +630,7 @@ public class RemoteBlockPushResolverSuite { long metaPosBeforeClose = testMetaFile.getPos(); testMetaFile.close(); StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); // This will complete without any IOExceptions because number of IOExceptions are less than // the threshold but the update to index and meta file will be unsuccessful. @@ -554,7 +641,7 @@ public class RemoteBlockPushResolverSuite { // Restore the meta stream so it can write successfully again. testMetaFile.restore(); MergeStatuses statuses = pushResolver.finalizeShuffleMerge( - new FinalizeShuffleMerge(TEST_APP, 0)); + new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); assertEquals("index position", 24, indexFile.getPos()); assertTrue("meta position", testMetaFile.getPos() > metaPosBeforeClose); validateMergeStatuses(statuses, new int[] {0}, new long[] {9}); @@ -562,11 +649,11 @@ public class RemoteBlockPushResolverSuite { validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 5}, new int[][] {{0}, {1}}); } - @Test (expected = RuntimeException.class) + @Test(expected = RuntimeException.class) public void testIOExceptionsExceededThreshold() throws IOException { RemoteBlockPushResolver.PushBlockStreamCallback callback = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4])); callback.onComplete(callback.getID()); @@ -575,7 +662,7 @@ public class RemoteBlockPushResolverSuite { for (int i = 1; i < 5; i++) { RemoteBlockPushResolver.PushBlockStreamCallback callback1 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, i, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, i, 0, 0)); try { callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[2])); } catch (IOException ioe) { @@ -588,7 +675,7 @@ public class RemoteBlockPushResolverSuite { try { RemoteBlockPushResolver.PushBlockStreamCallback callback2 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 5, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 5, 0, 0)); callback2.onData(callback.getID(), ByteBuffer.wrap(new byte[1])); } catch (Throwable t) { assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_5_0", @@ -597,12 +684,12 @@ public class RemoteBlockPushResolverSuite { } } - @Test (expected = RuntimeException.class) + @Test(expected = RuntimeException.class) public void testIOExceptionsDuringMetaUpdateIncreasesExceptionCount() throws IOException { useTestFiles(true, false); RemoteBlockPushResolver.PushBlockStreamCallback callback = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4])); callback.onComplete(callback.getID()); @@ -611,7 +698,7 @@ public class RemoteBlockPushResolverSuite { for (int i = 1; i < 5; i++) { RemoteBlockPushResolver.PushBlockStreamCallback callback1 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, i, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, i, 0, 0)); callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5])); // This will complete without any exceptions but the exception count is increased. callback1.onComplete(callback1.getID()); @@ -622,7 +709,7 @@ public class RemoteBlockPushResolverSuite { try { RemoteBlockPushResolver.PushBlockStreamCallback callback2 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 5, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 5, 0, 0)); callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[4])); callback2.onComplete(callback2.getID()); } catch (Throwable t) { @@ -632,7 +719,7 @@ public class RemoteBlockPushResolverSuite { } } - @Test (expected = RuntimeException.class) + @Test(expected = RuntimeException.class) public void testRequestForAbortedShufflePartitionThrowsException() { try { testIOExceptionsDuringMetaUpdateIncreasesExceptionCount(); @@ -641,7 +728,7 @@ public class RemoteBlockPushResolverSuite { } try { pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 10, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 10, 0, 0)); } catch (Throwable t) { assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_10_0", t.getMessage()); @@ -649,19 +736,19 @@ public class RemoteBlockPushResolverSuite { } } - @Test (expected = RuntimeException.class) + @Test(expected = RuntimeException.class) public void testPendingBlockIsAbortedImmediately() throws IOException { useTestFiles(true, false); RemoteBlockPushResolver.PushBlockStreamCallback callback = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); testIndexFile.close(); for (int i = 1; i < 6; i++) { RemoteBlockPushResolver.PushBlockStreamCallback callback1 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, i, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, i, 0, 0)); try { callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5])); // This will complete without any exceptions but the exception count is increased. @@ -682,19 +769,19 @@ public class RemoteBlockPushResolverSuite { } } - @Test (expected = RuntimeException.class) + @Test(expected = RuntimeException.class) public void testWritingPendingBufsIsAbortedImmediatelyDuringComplete() throws IOException { useTestFiles(true, false); RemoteBlockPushResolver.PushBlockStreamCallback callback = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); testIndexFile.close(); for (int i = 1; i < 5; i++) { RemoteBlockPushResolver.PushBlockStreamCallback callback1 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, i, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, i, 0, 0)); try { callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5])); // This will complete without any exceptions but the exception count is increased. @@ -706,7 +793,7 @@ public class RemoteBlockPushResolverSuite { assertEquals(4, partitionInfo.getNumIOExceptions()); RemoteBlockPushResolver.PushBlockStreamCallback callback2 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 5, 0, 0)); + new PushBlockStream(TEST_APP, 1, 0, 5, 0, 0)); callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); // This is deferred callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4])); @@ -738,10 +825,10 @@ public class RemoteBlockPushResolverSuite { new PushBlock(0, 0, 1, ByteBuffer.wrap(new byte[5])), new PushBlock(0, 1, 1, ByteBuffer.wrap(new byte[3])) }; - pushBlockHelper(TEST_APP, pushBlocks); + pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks); RemoteBlockPushResolver.PushBlockStreamCallback callback = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0)); callback.onData(callback.getID(), ByteBuffer.wrap(new byte[2])); callback.onComplete(callback.getID()); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); @@ -749,7 +836,7 @@ public class RemoteBlockPushResolverSuite { // Close the index file so truncate throws IOException testIndexFile.close(); MergeStatuses statuses = pushResolver.finalizeShuffleMerge( - new FinalizeShuffleMerge(TEST_APP, 0)); + new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); validateMergeStatuses(statuses, new int[] {1}, new long[] {8}); MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 1); validateChunks(TEST_APP, 0, 1, meta, new int[]{5, 3}, new int[][]{{0},{1}}); @@ -758,46 +845,53 @@ public class RemoteBlockPushResolverSuite { @Test public void testOnFailureInvokedMoreThanOncePerBlock() throws IOException { StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); stream1.onFailure(stream1.getID(), new RuntimeException("forced error")); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5])); // On failure on stream1 gets invoked again and should cause no interference stream1.onFailure(stream1.getID(), new RuntimeException("2nd forced error")); StreamCallbackWithID stream3 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 3, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 3, 0, 0)); // This should be deferred as stream 2 is still the active stream stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2])); // Stream 2 writes more and completes stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[4])); stream2.onComplete(stream2.getID()); stream3.onComplete(stream3.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {9, 2}, new int[][] {{1},{3}}); removeApplication(TEST_APP); } - @Test (expected = RuntimeException.class) + @Test(expected = RuntimeException.class) public void testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws IOException { StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); StreamCallbackWithID stream1Duplicate = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); stream1.onComplete(stream1.getID()); stream1Duplicate.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5])); // Should not change the current map id of the reduce partition stream1Duplicate.onFailure(stream2.getID(), new RuntimeException("forced error")); StreamCallbackWithID stream3 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0)); // This should be deferred as stream 2 is still the active stream stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2])); RuntimeException failedEx = null; @@ -812,7 +906,7 @@ public class RemoteBlockPushResolverSuite { // Stream 2 writes more and completes stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[4])); stream2.onComplete(stream2.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {11}, new int[][] {{0, 1}}); removeApplication(TEST_APP); @@ -821,20 +915,165 @@ public class RemoteBlockPushResolverSuite { } } + @Test(expected = IllegalArgumentException.class) + public void testPushBlockFromPreviousAttemptIsRejected() + throws IOException, InterruptedException { + Semaphore closed = new Semaphore(0); + pushResolver = new RemoteBlockPushResolver(conf) { + @Override + void closeAndDeletePartitionFilesIfNeeded( + AppShuffleInfo appShuffleInfo, + boolean cleanupLocalDirs) { + super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, cleanupLocalDirs); + closed.release(); + } + }; + String testApp = "testPushBlockFromPreviousAttemptIsRejected"; + Path[] attempt1LocalDirs = createLocalDirs(1); + registerExecutor(testApp, + prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1), + MERGE_DIRECTORY_META_1); + ByteBuffer[] blocks = new ByteBuffer[]{ + ByteBuffer.wrap(new byte[4]), + ByteBuffer.wrap(new byte[5]) + }; + StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, 1, 0, 0, 0, 0)); + for (ByteBuffer block : blocks) { + stream1.onData(stream1.getID(), block); + } + stream1.onComplete(stream1.getID()); + RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo = + pushResolver.validateAndGetAppShuffleInfo(testApp); + Map> partitions = + appShuffleInfo.getPartitions(); + for (Map partitionMap : + partitions.values()) { + for (RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo : partitionMap.values()) { + assertTrue(partitionInfo.getDataChannel().isOpen()); + assertTrue(partitionInfo.getMetaFile().getChannel().isOpen()); + assertTrue(partitionInfo.getIndexFile().getChannel().isOpen()); + } + } + Path[] attempt2LocalDirs = createLocalDirs(2); + registerExecutor(testApp, + prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2), + MERGE_DIRECTORY_META_2); + StreamCallbackWithID stream2 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, 2, 0, 1, 0, 0)); + for (ByteBuffer block : blocks) { + stream2.onData(stream2.getID(), block); + } + stream2.onComplete(stream2.getID()); + closed.acquire(); + // Check if all the file channels created for the first attempt are safely closed. + for (Map partitionMap : + partitions.values()) { + for (RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo : partitionMap.values()) { + assertFalse(partitionInfo.getDataChannel().isOpen()); + assertFalse(partitionInfo.getMetaFile().getChannel().isOpen()); + assertFalse(partitionInfo.getIndexFile().getChannel().isOpen()); + } + } + try { + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, 1, 0, 1, 0, 0)); + } catch (IllegalArgumentException re) { + assertEquals( + "The attempt id 1 in this PushBlockStream message does not match " + + "with the current attempt id 2 stored in shuffle service for application " + + testApp, re.getMessage()); + throw re; + } + } + + @Test(expected = IllegalArgumentException.class) + public void testFinalizeShuffleMergeFromPreviousAttemptIsAborted() + throws IOException, InterruptedException { + String testApp = "testFinalizeShuffleMergeFromPreviousAttemptIsAborted"; + Path[] attempt1LocalDirs = createLocalDirs(1); + registerExecutor(testApp, + prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1), + MERGE_DIRECTORY_META_1); + ByteBuffer[] blocks = new ByteBuffer[]{ + ByteBuffer.wrap(new byte[4]), + ByteBuffer.wrap(new byte[5]) + }; + StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, 1, 0, 0, 0, 0)); + for (ByteBuffer block : blocks) { + stream1.onData(stream1.getID(), block); + } + stream1.onComplete(stream1.getID()); + Path[] attempt2LocalDirs = createLocalDirs(2); + registerExecutor(testApp, + prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2), + MERGE_DIRECTORY_META_2); + try { + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, ATTEMPT_ID_1, 0)); + } catch (IllegalArgumentException e) { + assertEquals(e.getMessage(), + String.format("The attempt id %s in this FinalizeShuffleMerge message does not " + + "match with the current attempt id %s stored in shuffle service for application %s", + ATTEMPT_ID_1, ATTEMPT_ID_2, testApp)); + throw e; + } + } + + @Test(expected = ClosedChannelException.class) + public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted() + throws IOException, InterruptedException { + Semaphore closed = new Semaphore(0); + pushResolver = new RemoteBlockPushResolver(conf) { + @Override + void closeAndDeletePartitionFilesIfNeeded( + AppShuffleInfo appShuffleInfo, + boolean cleanupLocalDirs) { + super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, cleanupLocalDirs); + closed.release(); + } + }; + String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted"; + Path[] attempt1LocalDirs = createLocalDirs(1); + registerExecutor(testApp, + prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1), + MERGE_DIRECTORY_META_1); + ByteBuffer[] blocks = new ByteBuffer[]{ + ByteBuffer.wrap(new byte[4]), + ByteBuffer.wrap(new byte[5]), + ByteBuffer.wrap(new byte[6]), + ByteBuffer.wrap(new byte[7]) + }; + StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, 1, 0, 0, 0, 0)); + // The onData callback should be called 4 times here before the onComplete callback. But a + // register executor message arrives in shuffle service after the 2nd onData callback. The 3rd + // onData callback should all throw ClosedChannelException as their channels are closed. + stream1.onData(stream1.getID(), blocks[0]); + stream1.onData(stream1.getID(), blocks[1]); + Path[] attempt2LocalDirs = createLocalDirs(2); + registerExecutor(testApp, + prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2), + MERGE_DIRECTORY_META_2); + closed.acquire(); + // Should throw ClosedChannelException here. + stream1.onData(stream1.getID(), blocks[3]); + } + private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) throws IOException { pushResolver = new RemoteBlockPushResolver(conf) { @Override - AppShufflePartitionInfo newAppShufflePartitionInfo(AppShuffleId appShuffleId, int reduceId, - File dataFile, File indexFile, File metaFile) throws IOException { + AppShufflePartitionInfo newAppShufflePartitionInfo(String appId, int shuffleId, + int reduceId, File dataFile, File indexFile, File metaFile) throws IOException { MergeShuffleFile mergedIndexFile = useTestIndexFile ? new TestMergeShuffleFile(indexFile) : new MergeShuffleFile(indexFile); MergeShuffleFile mergedMetaFile = useTestMetaFile ? new TestMergeShuffleFile(metaFile) : new MergeShuffleFile(metaFile); - return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile, mergedIndexFile, + return new AppShufflePartitionInfo(appId, shuffleId, reduceId, dataFile, mergedIndexFile, mergedMetaFile); } }; - registerExecutor(TEST_APP, prepareLocalDirs(localDirs)); + registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META); } private Path[] createLocalDirs(int numLocalDirs) throws IOException { @@ -846,16 +1085,15 @@ public class RemoteBlockPushResolverSuite { return localDirs; } - private void registerExecutor(String appId, String[] localDirs) throws IOException { - ExecutorShuffleInfo shuffleInfo = new ExecutorShuffleInfo(localDirs, 1, "mergedShuffle"); + private void registerExecutor(String appId, String[] localDirs, String shuffleManagerMeta) { + ExecutorShuffleInfo shuffleInfo = new ExecutorShuffleInfo(localDirs, 1, shuffleManagerMeta); pushResolver.registerExecutor(appId, shuffleInfo); } - private String[] prepareLocalDirs(Path[] localDirs) throws IOException { + private String[] prepareLocalDirs(Path[] localDirs, String mergeDir) throws IOException { String[] blockMgrDirs = new String[localDirs.length]; for (int i = 0; i< localDirs.length; i++) { - Files.createDirectories(localDirs[i].resolve( - RemoteBlockPushResolver.MERGE_MANAGER_DIR + File.separator + "00")); + Files.createDirectories(localDirs[i].resolve(mergeDir + File.separator + "00")); blockMgrDirs[i] = localDirs[i].toFile().getPath() + File.separator + BLOCK_MANAGER_DIR; } return blockMgrDirs; @@ -898,10 +1136,12 @@ public class RemoteBlockPushResolverSuite { private void pushBlockHelper( String appId, + int attemptId, PushBlock[] blocks) throws IOException { for (int i = 0; i < blocks.length; i++) { StreamCallbackWithID stream = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(appId, blocks[i].shuffleId, blocks[i].mapIndex, blocks[i].reduceId, 0)); + new PushBlockStream( + appId, attemptId, blocks[i].shuffleId, blocks[i].mapIndex, blocks[i].reduceId, 0)); stream.onData(stream.getID(), blocks[i].buffer); stream.onComplete(stream.getID()); } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ef47252189..d11fa554ca 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -583,6 +583,7 @@ class SparkContext(config: SparkConf) extends Logging { _applicationId = _taskScheduler.applicationId() _applicationAttemptId = _taskScheduler.applicationAttemptId() _conf.set("spark.app.id", _applicationId) + _applicationAttemptId.foreach(attemptId => _conf.set(APP_ATTEMPT_ID, attemptId)) if (_conf.get(UI_REVERSE_PROXY)) { val proxyUrl = _conf.get(UI_REVERSE_PROXY_URL.key, "").stripSuffix("/") + "/proxy/" + _applicationId diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 3ef964fcb8..39c526cb0e 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2244,4 +2244,14 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val APP_ATTEMPT_ID = + ConfigBuilder("spark.app.attempt.id") + .internal() + .doc("The application attempt Id assigned from Hadoop YARN. " + + "When the application runs in cluster mode on YARN, there can be " + + "multiple attempts before failing the application") + .version("3.2.0") + .stringConf + .createOptional } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 98d094939c..43c7baf050 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -535,10 +535,17 @@ private[spark] class BlockManager( private def registerWithExternalShuffleServer(): Unit = { logInfo("Registering executor with local external shuffle service.") + val shuffleManagerMeta = + if (Utils.isPushBasedShuffleEnabled(conf)) { + s"${shuffleManager.getClass.getName}:" + + s"${diskBlockManager.getMergeDirectoryAndAttemptIDJsonString()}}}" + } else { + shuffleManager.getClass.getName + } val shuffleConfig = new ExecutorShuffleInfo( diskBlockManager.localDirsString, diskBlockManager.subDirsPerLocalDir, - shuffleManager.getClass.getName) + shuffleManagerMeta) val MAX_ATTEMPTS = conf.get(config.SHUFFLE_REGISTRATION_MAX_ATTEMPTS) val SLEEP_TIME_SECS = 5 diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index d49f43f380..d92f686b41 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -21,11 +21,18 @@ import java.io.{File, IOException} import java.nio.file.Files import java.util.UUID +import scala.collection.mutable.HashMap + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + import org.apache.spark.SparkConf import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.internal.{config, Logging} import org.apache.spark.network.shuffle.ExecutorDiskUtils -import org.apache.spark.storage.DiskBlockManager.MERGE_MANAGER_DIR +import org.apache.spark.storage.DiskBlockManager.ATTEMPT_ID_KEY +import org.apache.spark.storage.DiskBlockManager.MERGE_DIR_KEY +import org.apache.spark.storage.DiskBlockManager.MERGE_DIRECTORY import org.apache.spark.util.{ShutdownHookManager, Utils} /** @@ -57,6 +64,10 @@ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Bo // of subDirs(i) is protected by the lock of subDirs(i) private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) + // Get merge directory name, append attemptId if there is any + private val mergeDirName = + s"$MERGE_DIRECTORY${conf.get(config.APP_ATTEMPT_ID).map(id => s"_$id").getOrElse("")}" + // Create merge directories createLocalDirsForMergedShuffleBlocks() @@ -200,12 +211,12 @@ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Bo // Will create the merge_manager directory only if it doesn't exist under the local dir. Utils.getConfiguredLocalDirs(conf).foreach { rootDir => try { - val mergeDir = new File(rootDir, MERGE_MANAGER_DIR) + val mergeDir = new File(rootDir, mergeDirName) if (!mergeDir.exists()) { // This executor does not find merge_manager directory, it will try to create // the merge_manager directory and the sub directories. logDebug(s"Try to create $mergeDir and its sub dirs since the " + - s"$MERGE_MANAGER_DIR dir does not exist") + s"$mergeDirName dir does not exist") for (dirNum <- 0 until subDirsPerLocalDir) { val subDir = new File(mergeDir, "%02x".format(dirNum)) if (!subDir.exists()) { @@ -219,7 +230,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Bo } catch { case e: IOException => logError( - s"Failed to create $MERGE_MANAGER_DIR dir in $rootDir. Ignoring this directory.", e) + s"Failed to create $mergeDirName dir in $rootDir. Ignoring this directory.", e) } } } @@ -264,6 +275,17 @@ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Bo } } + def getMergeDirectoryAndAttemptIDJsonString(): String = { + val mergedMetaMap: HashMap[String, String] = new HashMap[String, String]() + mergedMetaMap.put(MERGE_DIR_KEY, mergeDirName) + conf.get(config.APP_ATTEMPT_ID).foreach( + attemptId => mergedMetaMap.put(ATTEMPT_ID_KEY, attemptId)) + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + val jsonString = mapper.writeValueAsString(mergedMetaMap) + jsonString + } + private def addShutdownHook(): AnyRef = { logDebug("Adding shutdown hook") // force eager creation of logger ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () => @@ -303,5 +325,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Bo } private[spark] object DiskBlockManager { - private[spark] val MERGE_MANAGER_DIR = "merge_manager" + val MERGE_DIRECTORY = "merge_manager" + val MERGE_DIR_KEY = "mergeDir" + val ATTEMPT_ID_KEY = "attemptId" } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b1df7bd03d..89aa299250 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2592,32 +2592,13 @@ private[spark] object Utils extends Logging { /** * Push based shuffle can only be enabled when the application is submitted - * to run in YARN mode, with external shuffle service enabled and - * spark.yarn.maxAttempts or the yarn cluster default max attempts is set to 1. - * TODO: Remove the requirement on spark.yarn.maxAttempts after SPARK-35546 - * Support push based shuffle with multiple app attempts + * to run in YARN mode, with external shuffle service enabled */ def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = { conf.get(PUSH_BASED_SHUFFLE_ENABLED) && (conf.get(IS_TESTING).getOrElse(false) || (conf.get(SHUFFLE_SERVICE_ENABLED) && - conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn" && - getYarnMaxAttempts(conf) == 1)) - } - - /** - * Returns the maximum number of attempts to register the AM in YARN mode. - * TODO: Remove this method after SPARK-35546 Support push based shuffle - * with multiple app attempts - */ - def getYarnMaxAttempts(conf: SparkConf): Int = { - val sparkMaxAttempts = conf.getOption("spark.yarn.maxAttempts").map(_.toInt) - val yarnMaxAttempts = getSparkOrYarnConfig(conf, YarnConfiguration.RM_AM_MAX_ATTEMPTS, - YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS.toString).toInt - sparkMaxAttempts match { - case Some(x) => if (x <= yarnMaxAttempts) x else yarnMaxAttempts - case None => yarnMaxAttempts - } + conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn")) } /** diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 6397c96f36..0443c40bce 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -20,7 +20,10 @@ package org.apache.spark.storage import java.io.{File, FileWriter} import java.nio.file.{Files, Paths} import java.nio.file.attribute.PosixFilePermissions +import java.util.HashMap +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.ObjectMapper import org.apache.commons.io.FileUtils import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} @@ -91,11 +94,11 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B } test("should still create merge directories if one already exists under a local dir") { - val mergeDir0 = new File(rootDir0, DiskBlockManager.MERGE_MANAGER_DIR) + val mergeDir0 = new File(rootDir0, DiskBlockManager.MERGE_DIRECTORY) if (!mergeDir0.exists()) { Files.createDirectories(mergeDir0.toPath) } - val mergeDir1 = new File(rootDir1, DiskBlockManager.MERGE_MANAGER_DIR) + val mergeDir1 = new File(rootDir1, DiskBlockManager.MERGE_DIRECTORY) if (mergeDir1.exists()) { Utils.deleteRecursively(mergeDir1) } @@ -104,7 +107,7 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B testConf.set(config.Tests.IS_TESTING, true) diskBlockManager = new DiskBlockManager(testConf, deleteFilesOnStop = true) assert(Utils.getConfiguredLocalDirs(testConf).map( - rootDir => new File(rootDir, DiskBlockManager.MERGE_MANAGER_DIR)) + rootDir => new File(rootDir, DiskBlockManager.MERGE_DIRECTORY)) .filter(mergeDir => mergeDir.exists()).length === 2) // mergeDir0 will be skipped as it already exists assert(mergeDir0.list().length === 0) @@ -124,6 +127,20 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B FileUtils.deleteQuietly(testDir) } + test("Encode merged directory name and attemptId in shuffleManager field") { + testConf.set(config.APP_ATTEMPT_ID, "1"); + diskBlockManager = new DiskBlockManager(testConf, deleteFilesOnStop = true) + val mergedShuffleMeta = diskBlockManager.getMergeDirectoryAndAttemptIDJsonString(); + val mapper: ObjectMapper = new ObjectMapper + val typeRef: TypeReference[HashMap[String, String]] = + new TypeReference[HashMap[String, String]]() {} + val metaMap: HashMap[String, String] = mapper.readValue(mergedShuffleMeta, typeRef) + val mergeDir = metaMap.get(DiskBlockManager.MERGE_DIR_KEY) + assert(mergeDir.equals(DiskBlockManager.MERGE_DIRECTORY + "_1")) + val attemptId = metaMap.get(DiskBlockManager.ATTEMPT_ID_KEY) + assert(attemptId.equals("1")) + } + def writeToFile(file: File, numBytes: Int): Unit = { val writer = new FileWriter(file, true) for (i <- 0 until numBytes) writer.write(i) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index dba7e39e93..095dbefdb2 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1450,7 +1450,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { conf.set("spark.yarn.maxAttempts", "1") assert(Utils.isPushBasedShuffleEnabled(conf) === true) conf.set("spark.yarn.maxAttempts", "2") - assert(Utils.isPushBasedShuffleEnabled(conf) === false) + assert(Utils.isPushBasedShuffleEnabled(conf) === true) } }