[SPARK-32916][SHUFFLE] Implementation of shuffle service that leverages push-based shuffle in YARN deployment mode

### What changes were proposed in this pull request?
This is one of the patches for SPIP [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602) which is needed for push-based shuffle.
Summary of changes:
- Adds an implementation of `MergedShuffleFileManager` which was introduced with [Spark 32915](https://issues.apache.org/jira/browse/SPARK-32915).
- Integrated the push-based shuffle service with `YarnShuffleService`.

### Why are the changes needed?
Refer to the SPIP in  [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
We have already verified the functionality and the improved performance as documented in the SPIP doc.

Lead-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Ye Zhou yezhoulinkedin.com

Closes #30062 from otterc/SPARK-32916.

Lead-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Co-authored-by: Ye Zhou <yezhou@linkedin.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
This commit is contained in:
Chandni Singh 2020-11-09 11:00:52 -06:00 committed by Mridul Muralidharan
parent 84dc374611
commit 8113c88542
18 changed files with 1747 additions and 87 deletions

View file

@ -18,6 +18,7 @@
package org.apache.spark.network.protocol;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import io.netty.buffer.ByteBuf;
@ -46,7 +47,11 @@ public class Encoders {
}
}
/** Bitmaps are encoded with their serialization length followed by the serialization bytes. */
/**
* Bitmaps are encoded with their serialization length followed by the serialization bytes.
*
* @since 3.1.0
*/
public static class Bitmaps {
public static int encodedLength(RoaringBitmap b) {
// Compress the bitmap before serializing it. Note that since BlockTransferMessage
@ -57,13 +62,20 @@ public class Encoders {
return b.serializedSizeInBytes();
}
/**
* The input ByteBuf for this encoder should have enough write capacity to fit the serialized
* bitmap. Other encoders which use {@link io.netty.buffer.AbstractByteBuf#writeBytes(byte[])}
* to write can expand the buf as writeBytes calls {@link ByteBuf#ensureWritable} internally.
* However, this encoder doesn't rely on netty's writeBytes and will fail if the input buf
* doesn't have enough write capacity.
*/
public static void encode(ByteBuf buf, RoaringBitmap b) {
int encodedLength = b.serializedSizeInBytes();
// RoaringBitmap requires nio ByteBuffer for serde. We expose the netty ByteBuf as a nio
// ByteBuffer. Here, we need to explicitly manage the index so we can write into the
// ByteBuffer, and the write is reflected in the underneath ByteBuf.
b.serialize(buf.nioBuffer(buf.writerIndex(), encodedLength));
buf.writerIndex(buf.writerIndex() + encodedLength);
ByteBuffer byteBuffer = buf.nioBuffer(buf.writerIndex(), buf.writableBytes());
b.serialize(byteBuffer);
buf.writerIndex(buf.writerIndex() + byteBuffer.position());
}
public static RoaringBitmap decode(ByteBuf buf) {
@ -172,7 +184,11 @@ public class Encoders {
}
}
/** Bitmap arrays are encoded with the number of bitmaps followed by per-Bitmap encoding. */
/**
* Bitmap arrays are encoded with the number of bitmaps followed by per-Bitmap encoding.
*
* @since 3.1.0
*/
public static class BitmapArrays {
public static int encodedLength(RoaringBitmap[] bitmaps) {
int totalLength = 4;

View file

@ -363,4 +363,39 @@ public class TransportConf {
return conf.getBoolean("spark.shuffle.useOldFetchProtocol", false);
}
/**
* Class name of the implementation of MergedShuffleFileManager that merges the blocks
* pushed to it when push-based shuffle is enabled. By default, push-based shuffle is disabled at
* a cluster level because this configuration is set to
* 'org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager'.
* To turn on push-based shuffle at a cluster level, set the configuration to
* 'org.apache.spark.network.shuffle.RemoteBlockPushResolver'.
*/
public String mergedShuffleFileManagerImpl() {
return conf.get("spark.shuffle.server.mergedShuffleFileManagerImpl",
"org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager");
}
/**
* The minimum size of a chunk when dividing a merged shuffle file into multiple chunks during
* push-based shuffle.
* A merged shuffle file consists of multiple small shuffle blocks. Fetching the
* complete merged shuffle file in a single response increases the memory requirements for the
* clients. Instead of serving the entire merged file, the shuffle service serves the
* merged file in `chunks`. A `chunk` constitutes few shuffle blocks in entirety and this
* configuration controls how big a chunk can get. A corresponding index file for each merged
* shuffle file will be generated indicating chunk boundaries.
*/
public int minChunkSizeInMergedShuffleFile() {
return Ints.checkedCast(JavaUtils.byteStringAsBytes(
conf.get("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "2m")));
}
/**
* The size of cache in memory which is used in push-based shuffle for storing merged index files.
*/
public long mergedIndexCacheSize() {
return JavaUtils.byteStringAsBytes(
conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m"));
}
}

View file

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Test;
import org.roaringbitmap.RoaringBitmap;
import static org.junit.Assert.*;
/**
* Tests for {@link Encoders}.
*/
public class EncodersSuite {
@Test
public void testRoaringBitmapEncodeDecode() {
RoaringBitmap bitmap = new RoaringBitmap();
bitmap.add(1, 2, 3);
ByteBuf buf = Unpooled.buffer(Encoders.Bitmaps.encodedLength(bitmap));
Encoders.Bitmaps.encode(buf, bitmap);
RoaringBitmap decodedBitmap = Encoders.Bitmaps.decode(buf);
assertEquals(bitmap, decodedBitmap);
}
@Test (expected = java.nio.BufferOverflowException.class)
public void testRoaringBitmapEncodeShouldFailWhenBufferIsSmall() {
RoaringBitmap bitmap = new RoaringBitmap();
bitmap.add(1, 2, 3);
ByteBuf buf = Unpooled.buffer(4);
Encoders.Bitmaps.encode(buf, bitmap);
}
@Test
public void testBitmapArraysEncodeDecode() {
RoaringBitmap[] bitmaps = new RoaringBitmap[] {
new RoaringBitmap(),
new RoaringBitmap(),
new RoaringBitmap(), // empty
new RoaringBitmap(),
new RoaringBitmap()
};
bitmaps[0].add(1, 2, 3);
bitmaps[1].add(1, 2, 4);
bitmaps[3].add(7L, 9L);
bitmaps[4].add(1L, 100L);
ByteBuf buf = Unpooled.buffer(Encoders.BitmapArrays.encodedLength(bitmaps));
Encoders.BitmapArrays.encode(buf, bitmaps);
RoaringBitmap[] decodedBitmaps = Encoders.BitmapArrays.decode(buf);
assertArrayEquals(bitmaps, decodedBitmaps);
}
}

View file

@ -47,6 +47,11 @@
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
<!-- Provided dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
@ -70,11 +75,6 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude

View file

@ -21,14 +21,18 @@ import java.net.ConnectException;
import com.google.common.base.Throwables;
import org.apache.spark.annotation.Evolving;
/**
* Plugs into {@link RetryingBlockFetcher} to further control when an exception should be retried
* and logged.
* Note: {@link RetryingBlockFetcher} will delegate the exception to this handler only when
* - remaining retries < max retries
* - exception is an IOException
*
* @since 3.1.0
*/
@Evolving
public interface ErrorHandler {
boolean shouldRetryError(Throwable t);
@ -44,6 +48,8 @@ public interface ErrorHandler {
/**
* The error handler for pushing shuffle blocks to remote shuffle services.
*
* @since 3.1.0
*/
class BlockPushErrorHandler implements ErrorHandler {
/**

View file

@ -68,7 +68,7 @@ public class ExternalBlockHandler extends RpcHandler {
throws IOException {
this(new OneForOneStreamManager(),
new ExternalShuffleBlockResolver(conf, registeredExecutorFile),
new NoOpMergedShuffleFileManager());
new NoOpMergedShuffleFileManager(conf));
}
public ExternalBlockHandler(
@ -89,7 +89,7 @@ public class ExternalBlockHandler extends RpcHandler {
public ExternalBlockHandler(
OneForOneStreamManager streamManager,
ExternalShuffleBlockResolver blockManager) {
this(streamManager, blockManager, new NoOpMergedShuffleFileManager());
this(streamManager, blockManager, new NoOpMergedShuffleFileManager(null));
}
/** Enables mocking out the StreamManager, BlockManager, and MergeManager. */
@ -175,7 +175,7 @@ public class ExternalBlockHandler extends RpcHandler {
RegisterExecutor msg = (RegisterExecutor) msgObj;
checkAuth(client, msg.appId);
blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo);
mergeManager.registerExecutor(msg.appId, msg.executorInfo.localDirs);
mergeManager.registerExecutor(msg.appId, msg.executorInfo);
callback.onSuccess(ByteBuffer.wrap(new byte[0]));
} finally {
responseDelayContext.stop();
@ -232,6 +232,7 @@ public class ExternalBlockHandler extends RpcHandler {
*/
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
blockManager.applicationRemoved(appId, cleanupLocalDirs);
mergeManager.applicationRemoved(appId, cleanupLocalDirs);
}
/**
@ -430,8 +431,15 @@ public class ExternalBlockHandler extends RpcHandler {
/**
* Dummy implementation of merged shuffle file manager. Suitable for when push-based shuffle
* is not enabled.
*
* @since 3.1.0
*/
private static class NoOpMergedShuffleFileManager implements MergedShuffleFileManager {
public static class NoOpMergedShuffleFileManager implements MergedShuffleFileManager {
// This constructor is needed because we use this constructor to instantiate an implementation
// of MergedShuffleFileManager using reflection.
// See YarnShuffleService#newMergedShuffleFileManagerInstance.
public NoOpMergedShuffleFileManager(TransportConf transportConf) {}
@Override
public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
@ -444,18 +452,13 @@ public class ExternalBlockHandler extends RpcHandler {
}
@Override
public void registerApplication(String appId, String user) {
// No-op. Do nothing.
}
@Override
public void registerExecutor(String appId, String[] localDirs) {
public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) {
// No-Op. Do nothing.
}
@Override
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
// No-Op. Do nothing.
}
@Override

View file

@ -34,6 +34,8 @@ import org.apache.spark.network.protocol.Encoders;
* 1. Number of chunks in a merged shuffle block.
* 2. Bitmaps for each chunk in the merged block. A chunk bitmap contains all the mapIds that were
* merged to that merged block chunk.
*
* @since 3.1.0
*/
public class MergedBlockMeta {
private final int numChunks;

View file

@ -19,13 +19,14 @@ package org.apache.spark.network.shuffle;
import java.io.IOException;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
/**
* The MergedShuffleFileManager is used to process push based shuffle when enabled. It works
* along side {@link ExternalBlockHandler} and serves as an RPCHandler for
@ -33,7 +34,10 @@ import org.apache.spark.network.shuffle.protocol.PushBlockStream;
* remotely pushed streams of shuffle blocks to merge them into merged shuffle files. Right
* now, support for push based shuffle is only implemented for external shuffle service in
* YARN mode.
*
* @since 3.1.0
*/
@Evolving
public interface MergedShuffleFileManager {
/**
* Provides the stream callback used to process a remotely pushed block. The callback is
@ -56,25 +60,15 @@ public interface MergedShuffleFileManager {
MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException;
/**
* Registers an application when it starts. It also stores the username which is necessary
* for generating the host local directories for merged shuffle files.
* Right now, this is invoked by YarnShuffleService.
* Registers an executor with MergedShuffleFileManager. This executor-info provides
* the directories and number of sub-dirs per dir so that MergedShuffleFileManager knows where to
* store and look for shuffle data for a given application. It is invoked by the RPC call when
* executor tries to register with the local shuffle service.
*
* @param appId application ID
* @param user username
* @param executorInfo The list of local dirs that this executor gets granted from NodeManager
*/
void registerApplication(String appId, String user);
/**
* Registers an executor with its local dir list when it starts. This provides the specific path
* so MergedShuffleFileManager knows where to store and look for shuffle data for a
* given application. It is invoked by the RPC call when executor tries to register with the
* local shuffle service.
*
* @param appId application ID
* @param localDirs The list of local dirs that this executor gets granted from NodeManager
*/
void registerExecutor(String appId, String[] localDirs);
void registerExecutor(String appId, ExecutorShuffleInfo executorInfo);
/**
* Invoked when an application finishes. This cleans up any remaining metadata associated with

View file

@ -35,10 +35,13 @@ import org.apache.spark.network.shuffle.protocol.PushBlockStream;
* be merged instead of for fetching them from remote shuffle services. This is used by
* ShuffleWriter when the block push process is initiated. The supplied BlockFetchingListener
* is used to handle the success or failure in pushing each blocks.
*
* @since 3.1.0
*/
public class OneForOneBlockPusher {
private static final Logger logger = LoggerFactory.getLogger(OneForOneBlockPusher.class);
private static final ErrorHandler PUSH_ERROR_HANDLER = new ErrorHandler.BlockPushErrorHandler();
public static final String SHUFFLE_PUSH_BLOCK_PREFIX = "shufflePush";
private final TransportClient client;
private final String appId;
@ -115,7 +118,13 @@ public class OneForOneBlockPusher {
for (int i = 0; i < blockIds.length; i++) {
assert buffers.containsKey(blockIds[i]) : "Could not find the block buffer for block "
+ blockIds[i];
ByteBuffer header = new PushBlockStream(appId, blockIds[i], i).toByteBuffer();
String[] blockIdParts = blockIds[i].split("_");
if (blockIdParts.length != 4 || !blockIdParts[0].equals(SHUFFLE_PUSH_BLOCK_PREFIX)) {
throw new IllegalArgumentException(
"Unexpected shuffle push block id format: " + blockIds[i]);
}
ByteBuffer header = new PushBlockStream(appId, Integer.parseInt(blockIdParts[1]),
Integer.parseInt(blockIdParts[2]), Integer.parseInt(blockIdParts[3]) , i).toByteBuffer();
client.uploadStream(new NioManagedBuffer(header), buffers.get(blockIds[i]),
new BlockPushCallback(i, blockIds[i]));
}

View file

@ -0,0 +1,934 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.shuffle;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.Weigher;
import com.google.common.collect.Maps;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
/**
* An implementation of {@link MergedShuffleFileManager} that provides the most essential shuffle
* service processing logic to support push based shuffle.
*
* @since 3.1.0
*/
public class RemoteBlockPushResolver implements MergedShuffleFileManager {
private static final Logger logger = LoggerFactory.getLogger(RemoteBlockPushResolver.class);
@VisibleForTesting
static final String MERGE_MANAGER_DIR = "merge_manager";
private final ConcurrentMap<String, AppPathsInfo> appsPathInfo;
private final ConcurrentMap<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> partitions;
private final Executor directoryCleaner;
private final TransportConf conf;
private final int minChunkSize;
private final ErrorHandler.BlockPushErrorHandler errorHandler;
@SuppressWarnings("UnstableApiUsage")
private final LoadingCache<File, ShuffleIndexInformation> indexCache;
@SuppressWarnings("UnstableApiUsage")
public RemoteBlockPushResolver(TransportConf conf) {
this.conf = conf;
this.partitions = Maps.newConcurrentMap();
this.appsPathInfo = Maps.newConcurrentMap();
this.directoryCleaner = Executors.newSingleThreadExecutor(
// Add `spark` prefix because it will run in NM in Yarn mode.
NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner"));
this.minChunkSize = conf.minChunkSizeInMergedShuffleFile();
CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
new CacheLoader<File, ShuffleIndexInformation>() {
public ShuffleIndexInformation load(File file) throws IOException {
return new ShuffleIndexInformation(file);
}
};
indexCache = CacheBuilder.newBuilder()
.maximumWeight(conf.mergedIndexCacheSize())
.weigher((Weigher<File, ShuffleIndexInformation>) (file, indexInfo) -> indexInfo.getSize())
.build(indexCacheLoader);
this.errorHandler = new ErrorHandler.BlockPushErrorHandler();
}
/**
* Given the appShuffleId and reduceId that uniquely identifies a given shuffle partition of an
* application, retrieves the associated metadata. If not present and the corresponding merged
* shuffle does not exist, initializes the metadata.
*/
private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
AppShuffleId appShuffleId,
int reduceId) {
File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId);
if (!partitions.containsKey(appShuffleId) && dataFile.exists()) {
// If this partition is already finalized then the partitions map will not contain
// the appShuffleId but the data file would exist. In that case the block is considered late.
return null;
}
Map<Integer, AppShufflePartitionInfo> shufflePartitions =
partitions.computeIfAbsent(appShuffleId, id -> Maps.newConcurrentMap());
return shufflePartitions.computeIfAbsent(reduceId, key -> {
// It only gets here when the key is not present in the map. This could either
// be the first time the merge manager receives a pushed block for a given application
// shuffle partition, or after the merged shuffle file is finalized. We handle these
// two cases accordingly by checking if the file already exists.
File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
File metaFile = getMergedShuffleMetaFile(appShuffleId, reduceId);
try {
if (dataFile.exists()) {
return null;
} else {
return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile, indexFile, metaFile);
}
} catch (IOException e) {
logger.error(
"Cannot create merged shuffle partition with data file {}, index file {}, and "
+ "meta file {}", dataFile.getAbsolutePath(),
indexFile.getAbsolutePath(), metaFile.getAbsolutePath());
throw new RuntimeException(
String.format("Cannot initialize merged shuffle partition for appId %s shuffleId %s "
+ "reduceId %s", appShuffleId.appId, appShuffleId.shuffleId, reduceId), e);
}
});
}
@Override
public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int reduceId) {
AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId);
File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
if (!indexFile.exists()) {
throw new RuntimeException(String.format(
"Merged shuffle index file %s not found", indexFile.getPath()));
}
int size = (int) indexFile.length();
// First entry is the zero offset
int numChunks = (size / Long.BYTES) - 1;
File metaFile = getMergedShuffleMetaFile(appShuffleId, reduceId);
if (!metaFile.exists()) {
throw new RuntimeException(String.format("Merged shuffle meta file %s not found",
metaFile.getPath()));
}
FileSegmentManagedBuffer chunkBitMaps =
new FileSegmentManagedBuffer(conf, metaFile, 0L, metaFile.length());
logger.trace(
"{} shuffleId {} reduceId {} num chunks {}", appId, shuffleId, reduceId, numChunks);
return new MergedBlockMeta(numChunks, chunkBitMaps);
}
@SuppressWarnings("UnstableApiUsage")
@Override
public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceId, int chunkId) {
AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId);
File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId);
if (!dataFile.exists()) {
throw new RuntimeException(String.format("Merged shuffle data file %s not found",
dataFile.getPath()));
}
File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
try {
// If we get here, the merged shuffle file should have been properly finalized. Thus we can
// use the file length to determine the size of the merged shuffle block.
ShuffleIndexInformation shuffleIndexInformation = indexCache.get(indexFile);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(chunkId);
return new FileSegmentManagedBuffer(
conf, dataFile, shuffleIndexRecord.getOffset(), shuffleIndexRecord.getLength());
} catch (ExecutionException e) {
throw new RuntimeException(String.format(
"Failed to open merged shuffle index file %s", indexFile.getPath()), e);
}
}
/**
* The logic here is consistent with
* org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile
*/
private File getFile(String appId, String filename) {
// TODO: [SPARK-33236] Change the message when this service is able to handle NM restart
AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
appPathsInfo.subDirsPerLocalDir, filename);
logger.debug("Get merged file {}", targetFile.getAbsolutePath());
return targetFile;
}
private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int reduceId) {
String fileName = String.format("%s.data", generateFileName(appShuffleId, reduceId));
return getFile(appShuffleId.appId, fileName);
}
private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int reduceId) {
String indexName = String.format("%s.index", generateFileName(appShuffleId, reduceId));
return getFile(appShuffleId.appId, indexName);
}
private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int reduceId) {
String metaName = String.format("%s.meta", generateFileName(appShuffleId, reduceId));
return getFile(appShuffleId.appId, metaName);
}
@Override
public String[] getMergedBlockDirs(String appId) {
AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
String[] activeLocalDirs = Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
"application " + appId
+ " active local dirs list has not been updated by any executor registration");
return activeLocalDirs;
}
@Override
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
// TODO: [SPARK-33236] Change the message when this service is able to handle NM restart
AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.remove(appId),
"application " + appId + " is not registered or NM was restarted.");
Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>> iterator =
partitions.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> entry = iterator.next();
AppShuffleId appShuffleId = entry.getKey();
if (appId.equals(appShuffleId.appId)) {
iterator.remove();
for (AppShufflePartitionInfo partitionInfo : entry.getValue().values()) {
partitionInfo.closeAllFiles();
}
}
}
if (cleanupLocalDirs) {
Path[] dirs = Arrays.stream(appPathsInfo.activeLocalDirs)
.map(dir -> Paths.get(dir)).toArray(Path[]::new);
directoryCleaner.execute(() -> deleteExecutorDirs(dirs));
}
}
/**
* Serially delete local dirs, executed in a separate thread.
*/
@VisibleForTesting
void deleteExecutorDirs(Path[] dirs) {
for (Path localDir : dirs) {
try {
if (Files.exists(localDir)) {
JavaUtils.deleteRecursively(localDir.toFile());
logger.debug("Successfully cleaned up directory: {}", localDir);
}
} catch (Exception e) {
logger.error("Failed to delete directory: {}", localDir, e);
}
}
}
@Override
public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
// Retrieve merged shuffle file metadata
AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId);
AppShufflePartitionInfo partitionInfoBeforeCheck =
getOrCreateAppShufflePartitionInfo(appShuffleId, msg.reduceId);
// Here partitionInfo will be null in 2 cases:
// 1) The request is received for a block that has already been merged, this is possible due
// to the retry logic.
// 2) The request is received after the merged shuffle is finalized, thus is too late.
//
// For case 1, we will drain the data in the channel and just respond success
// to the client. This is required because the response of the previously merged
// block will be ignored by the client, per the logic in RetryingBlockFetcher.
// Note that the netty server should receive data for a given block id only from 1 channel
// at any time. The block should be pushed only from successful maps, thus there should be
// only 1 source for a given block at any time. Although the netty client might retry sending
// this block to the server multiple times, the data of the same block always arrives from the
// same channel thus the server should have already processed the previous request of this
// block before seeing it again in the channel. This guarantees that we can simply just
// check the bitmap to determine if a block is a duplicate or not.
//
// For case 2, we will also drain the data in the channel, but throw an exception in
// {@link org.apache.spark.network.client.StreamCallback#onComplete(String)}. This way,
// the client will be notified of the failure but the channel will remain active. Keeping
// the channel alive is important because the same channel could be reused by multiple map
// tasks in the executor JVM, which belongs to different stages. While one of the shuffles
// in these stages is finalized, the others might still be active. Tearing down the channel
// on the server side will disrupt these other on-going shuffle merges. It's also important
// to notify the client of the failure, so that it can properly halt pushing the remaining
// blocks upon receiving such failures to preserve resources on the server/client side.
//
// Speculative execution would also raise a possible scenario with duplicate blocks. Although
// speculative execution would kill the slower task attempt, leading to only 1 task attempt
// succeeding in the end, there is no guarantee that only one copy of the block will be
// pushed. This is due to our handling of block push process outside of the map task, thus
// it is possible for the speculative task attempt to initiate the block push process before
// getting killed. When this happens, we need to distinguish the duplicate blocks as they
// arrive. More details on this is explained in later comments.
// Track if the block is received after shuffle merge finalize
final boolean isTooLate = partitionInfoBeforeCheck == null;
// Check if the given block is already merged by checking the bitmap against the given map index
final AppShufflePartitionInfo partitionInfo = partitionInfoBeforeCheck != null
&& partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null
: partitionInfoBeforeCheck;
final String streamId = String.format("%s_%d_%d_%d",
OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, appShuffleId.shuffleId, msg.mapIndex,
msg.reduceId);
if (partitionInfo != null) {
return new PushBlockStreamCallback(this, streamId, partitionInfo, msg.mapIndex);
} else {
// For a duplicate block or a block which is late, respond back with a callback that handles
// them differently.
return new StreamCallbackWithID() {
@Override
public String getID() {
return streamId;
}
@Override
public void onData(String streamId, ByteBuffer buf) {
// Ignore the requests. It reaches here either when a request is received after the
// shuffle file is finalized or when a request is for a duplicate block.
}
@Override
public void onComplete(String streamId) {
if (isTooLate) {
// Throw an exception here so the block data is drained from channel and server
// responds RpcFailure to the client.
throw new RuntimeException(String.format("Block %s %s", streamId,
ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX));
}
// For duplicate block that is received before the shuffle merge finalizes, the
// server should respond success to the client.
}
@Override
public void onFailure(String streamId, Throwable cause) {
}
};
}
}
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Override
public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException {
logger.info("Finalizing shuffle {} from Application {}.", msg.shuffleId, msg.appId);
AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId);
Map<Integer, AppShufflePartitionInfo> shufflePartitions = partitions.get(appShuffleId);
MergeStatuses mergeStatuses;
if (shufflePartitions == null || shufflePartitions.isEmpty()) {
mergeStatuses =
new MergeStatuses(msg.shuffleId, new RoaringBitmap[0], new int[0], new long[0]);
} else {
Collection<AppShufflePartitionInfo> partitionsToFinalize = shufflePartitions.values();
int totalPartitions = partitionsToFinalize.size();
RoaringBitmap[] bitmaps = new RoaringBitmap[totalPartitions];
int[] reduceIds = new int[totalPartitions];
long[] sizes = new long[totalPartitions];
Iterator<AppShufflePartitionInfo> partitionsIter = partitionsToFinalize.iterator();
int idx = 0;
while (partitionsIter.hasNext()) {
AppShufflePartitionInfo partition = partitionsIter.next();
synchronized (partition) {
// Get rid of any partial block data at the end of the file. This could either
// be due to failure or a request still being processed when the shuffle
// merge gets finalized.
try {
partition.dataChannel.truncate(partition.getPosition());
if (partition.getPosition() != partition.getLastChunkOffset()) {
partition.updateChunkInfo(partition.getPosition(), partition.lastMergedMapIndex);
}
bitmaps[idx] = partition.mapTracker;
reduceIds[idx] = partition.reduceId;
sizes[idx++] = partition.getPosition();
} catch (IOException ioe) {
logger.warn("Exception while finalizing shuffle partition {} {} {}", msg.appId,
msg.shuffleId, partition.reduceId, ioe);
} finally {
partition.closeAllFiles();
// The partition should be removed after the files are written so that any new stream
// for the same reduce partition will see that the data file exists.
partitionsIter.remove();
}
}
}
mergeStatuses = new MergeStatuses(msg.shuffleId, bitmaps, reduceIds, sizes);
}
partitions.remove(appShuffleId);
logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, msg.appId);
return mergeStatuses;
}
@Override
public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) {
if (logger.isDebugEnabled()) {
logger.debug("register executor with RemoteBlockPushResolver {} local-dirs {} "
+ "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
executorInfo.subDirsPerLocalDir);
}
appsPathInfo.computeIfAbsent(appId, id -> new AppPathsInfo(appId, executorInfo.localDirs,
executorInfo.subDirsPerLocalDir));
}
private static String generateFileName(AppShuffleId appShuffleId, int reduceId) {
return String.format("mergedShuffle_%s_%d_%d", appShuffleId.appId, appShuffleId.shuffleId,
reduceId);
}
/**
* Callback for push stream that handles blocks which are not already merged.
*/
static class PushBlockStreamCallback implements StreamCallbackWithID {
private final RemoteBlockPushResolver mergeManager;
private final String streamId;
private final int mapIndex;
private final AppShufflePartitionInfo partitionInfo;
private int length = 0;
// This indicates that this stream got the opportunity to write the blocks to the merged file.
// Once this is set to true and the stream encounters a failure then it will take necessary
// action to overwrite any partial written data. This is reset to false when the stream
// completes without any failures.
private boolean isWriting = false;
// Use on-heap instead of direct ByteBuffer since these buffers will be GC'ed very quickly
private List<ByteBuffer> deferredBufs;
private PushBlockStreamCallback(
RemoteBlockPushResolver mergeManager,
String streamId,
AppShufflePartitionInfo partitionInfo,
int mapIndex) {
this.mergeManager = Preconditions.checkNotNull(mergeManager);
this.streamId = streamId;
this.partitionInfo = Preconditions.checkNotNull(partitionInfo);
this.mapIndex = mapIndex;
}
@Override
public String getID() {
return streamId;
}
/**
* Write a ByteBuffer to the merged shuffle file. Here we keep track of the length of the
* block data written to file. In case of failure during writing block to file, we use the
* information tracked in partitionInfo to overwrite the corrupt block when writing the new
* block.
*/
private void writeBuf(ByteBuffer buf) throws IOException {
while (buf.hasRemaining()) {
if (partitionInfo.isEncounteredFailure()) {
long updatedPos = partitionInfo.getPosition() + length;
logger.debug(
"{} shuffleId {} reduceId {} encountered failure current pos {} updated pos {}",
partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
partitionInfo.reduceId, partitionInfo.getPosition(), updatedPos);
length += partitionInfo.dataChannel.write(buf, updatedPos);
} else {
length += partitionInfo.dataChannel.write(buf);
}
}
}
/**
* There will be multiple streams of map blocks belonging to the same reduce partition. At any
* given point of time, only a single map stream can write its data to the merged file. Until
* this stream is completed, the other streams defer writing. This prevents corruption of
* merged data. This returns whether this stream is the active stream that can write to the
* merged file.
*/
private boolean allowedToWrite() {
return partitionInfo.getCurrentMapIndex() < 0
|| partitionInfo.getCurrentMapIndex() == mapIndex;
}
/**
* Returns if this is a duplicate block generated by speculative tasks. With speculative
* tasks, we could receive the same block from 2 different sources at the same time. One of
* them is going to be the first to set the currentMapIndex. When that block does so, it's
* going to see the currentMapIndex initially as -1. After it sets the currentMapIndex, it's
* going to write some data to disk, thus increasing the length counter. The other duplicate
* block is going to see the currentMapIndex already set to its mapIndex. However, it hasn't
* written any data yet. If the first block gets written completely and resets the
* currentMapIndex to -1 before the processing for the second block finishes, we can just
* check the bitmap to identify the second as a duplicate.
*/
private boolean isDuplicateBlock() {
return (partitionInfo.getCurrentMapIndex() == mapIndex && length == 0)
|| partitionInfo.mapTracker.contains(mapIndex);
}
/**
* This is only invoked when the stream is able to write. The stream first writes any deferred
* block parts buffered in memory.
*/
private void writeAnyDeferredBufs() throws IOException {
if (deferredBufs != null && !deferredBufs.isEmpty()) {
for (ByteBuffer deferredBuf : deferredBufs) {
writeBuf(deferredBuf);
}
deferredBufs = null;
}
}
@Override
public void onData(String streamId, ByteBuffer buf) throws IOException {
// When handling the block data using StreamInterceptor, it can help to reduce the amount
// of data that needs to be buffered in memory since it does not wait till the completion
// of the frame before handling the message, thus releasing the ByteBuf earlier. However,
// this also means it would chunk a block into multiple buffers. Here, we want to preserve
// the benefit of handling the block data using StreamInterceptor as much as possible while
// providing the guarantee that one block would be continuously written to the merged
// shuffle file before the next block starts. For each shuffle partition, we would track
// the current map index to make sure only block matching the map index can be written to
// disk. If one server thread sees the block being handled is the current block, it would
// directly write the block to disk. Otherwise, it would buffer the block chunks in memory.
// If the block becomes the current block before we see the end of it, we would then dump
// all buffered block data to disk and write the remaining portions of the block directly
// to disk as well. This way, we avoid having to buffer the entirety of every blocks in
// memory, while still providing the necessary guarantee.
synchronized (partitionInfo) {
Map<Integer, AppShufflePartitionInfo> shufflePartitions =
mergeManager.partitions.get(partitionInfo.appShuffleId);
// If the partitionInfo corresponding to (appId, shuffleId, reduceId) is no longer present
// then it means that the shuffle merge has already been finalized. We should thus ignore
// the data and just drain the remaining bytes of this message. This check should be
// placed inside the synchronized block to make sure that checking the key is still
// present and processing the data is atomic.
if (shufflePartitions == null || !shufflePartitions.containsKey(partitionInfo.reduceId)) {
deferredBufs = null;
return;
}
// Check whether we can write to disk
if (allowedToWrite()) {
isWriting = true;
// Identify duplicate block generated by speculative tasks. We respond success to
// the client in cases of duplicate even though no data is written.
if (isDuplicateBlock()) {
deferredBufs = null;
return;
}
logger.trace("{} shuffleId {} reduceId {} onData writable",
partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
partitionInfo.reduceId);
if (partitionInfo.getCurrentMapIndex() < 0) {
partitionInfo.setCurrentMapIndex(mapIndex);
}
// If we got here, it's safe to write the block data to the merged shuffle file. We
// first write any deferred block.
writeAnyDeferredBufs();
writeBuf(buf);
// If we got here, it means we successfully write the current chunk of block to merged
// shuffle file. If we encountered failure while writing the previous block, we should
// reset the file channel position and the status of partitionInfo to indicate that we
// have recovered from previous disk write failure. However, we do not update the
// position tracked by partitionInfo here. That is only updated while the entire block
// is successfully written to merged shuffle file.
if (partitionInfo.isEncounteredFailure()) {
partitionInfo.dataChannel.position(partitionInfo.getPosition() + length);
partitionInfo.setEncounteredFailure(false);
}
} else {
logger.trace("{} shuffleId {} reduceId {} onData deferred",
partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
partitionInfo.reduceId);
// If we cannot write to disk, we buffer the current block chunk in memory so it could
// potentially be written to disk later. We take our best effort without guarantee
// that the block will be written to disk. If the block data is divided into multiple
// chunks during TCP transportation, each #onData invocation is an attempt to write
// the block to disk. If the block is still not written to disk after all #onData
// invocations, the final #onComplete invocation is the last attempt to write the
// block to disk. If we still couldn't write this block to disk after this, we give up
// on this block push request and respond failure to client. We could potentially
// buffer the block longer or wait for a few iterations inside #onData or #onComplete
// to increase the chance of writing the block to disk, however this would incur more
// memory footprint or decrease the server processing throughput for the shuffle
// service. In addition, during test we observed that by randomizing the order in
// which clients sends block push requests batches, only ~0.5% blocks failed to be
// written to disk due to this reason. We thus decide to optimize for server
// throughput and memory usage.
if (deferredBufs == null) {
deferredBufs = new LinkedList<>();
}
// Write the buffer to the in-memory deferred cache. Since buf is a slice of a larger
// byte buffer, we cache only the relevant bytes not the entire large buffer to save
// memory.
ByteBuffer deferredBuf = ByteBuffer.allocate(buf.remaining());
deferredBuf.put(buf);
deferredBuf.flip();
deferredBufs.add(deferredBuf);
}
}
}
@Override
public void onComplete(String streamId) throws IOException {
synchronized (partitionInfo) {
logger.trace("{} shuffleId {} reduceId {} onComplete invoked",
partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
partitionInfo.reduceId);
Map<Integer, AppShufflePartitionInfo> shufflePartitions =
mergeManager.partitions.get(partitionInfo.appShuffleId);
// When this request initially got to the server, the shuffle merge finalize request
// was not received yet. By the time we finish reading this message, the shuffle merge
// however is already finalized. We should thus respond RpcFailure to the client.
if (shufflePartitions == null || !shufflePartitions.containsKey(partitionInfo.reduceId)) {
deferredBufs = null;
throw new RuntimeException(String.format("Block %s %s", streamId,
ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX));
}
// Check if we can commit this block
if (allowedToWrite()) {
isWriting = true;
// Identify duplicate block generated by speculative tasks. We respond success to
// the client in cases of duplicate even though no data is written.
if (isDuplicateBlock()) {
deferredBufs = null;
return;
}
if (partitionInfo.getCurrentMapIndex() < 0) {
writeAnyDeferredBufs();
}
long updatedPos = partitionInfo.getPosition() + length;
boolean indexUpdated = false;
if (updatedPos - partitionInfo.getLastChunkOffset() >= mergeManager.minChunkSize) {
partitionInfo.updateChunkInfo(updatedPos, mapIndex);
indexUpdated = true;
}
partitionInfo.setPosition(updatedPos);
partitionInfo.setCurrentMapIndex(-1);
// update merged results
partitionInfo.blockMerged(mapIndex);
if (indexUpdated) {
partitionInfo.resetChunkTracker();
}
} else {
deferredBufs = null;
throw new RuntimeException(String.format("%s %s to merged shuffle",
ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX,
streamId));
}
}
isWriting = false;
}
@Override
public void onFailure(String streamId, Throwable throwable) throws IOException {
if (mergeManager.errorHandler.shouldLogError(throwable)) {
logger.error("Encountered issue when merging {}", streamId, throwable);
} else {
logger.debug("Encountered issue when merging {}", streamId, throwable);
}
// Only update partitionInfo if the failure corresponds to a valid request. If the
// request is too late, i.e. received after shuffle merge finalize, #onFailure will
// also be triggered, and we can just ignore. Also, if we couldn't find an opportunity
// to write the block data to disk, we should also ignore here.
if (isWriting) {
synchronized (partitionInfo) {
Map<Integer, AppShufflePartitionInfo> shufflePartitions =
mergeManager.partitions.get(partitionInfo.appShuffleId);
if (shufflePartitions != null && shufflePartitions.containsKey(partitionInfo.reduceId)) {
logger.debug("{} shuffleId {} reduceId {} set encountered failure",
partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
partitionInfo.reduceId);
partitionInfo.setCurrentMapIndex(-1);
partitionInfo.setEncounteredFailure(true);
}
}
}
}
}
/**
* ID that uniquely identifies a shuffle for an application. This is used as a key in
* {@link #partitions}.
*/
public static class AppShuffleId {
public final String appId;
public final int shuffleId;
AppShuffleId(String appId, int shuffleId) {
this.appId = appId;
this.shuffleId = shuffleId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AppShuffleId that = (AppShuffleId) o;
return shuffleId == that.shuffleId && Objects.equal(appId, that.appId);
}
@Override
public int hashCode() {
return Objects.hashCode(appId, shuffleId);
}
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("appId", appId)
.add("shuffleId", shuffleId)
.toString();
}
}
/** Metadata tracked for an actively merged shuffle partition */
public static class AppShufflePartitionInfo {
private final AppShuffleId appShuffleId;
private final int reduceId;
// The merged shuffle data file channel
public FileChannel dataChannel;
// Location offset of the last successfully merged block for this shuffle partition
private long position;
// Indicating whether failure was encountered when merging the previous block
private boolean encounteredFailure;
// Track the map index whose block is being merged for this shuffle partition
private int currentMapIndex;
// Bitmap tracking which mapper's blocks have been merged for this shuffle partition
private RoaringBitmap mapTracker;
// The index file for a particular merged shuffle contains the chunk offsets.
private RandomAccessFile indexFile;
// The meta file for a particular merged shuffle contains all the map indices that belong to
// every chunk. The entry per chunk is a serialized bitmap.
private RandomAccessFile metaFile;
// The offset for the last chunk tracked in the index file for this shuffle partition
private long lastChunkOffset;
private int lastMergedMapIndex = -1;
// Bitmap tracking which mapper's blocks are in the current shuffle chunk
private RoaringBitmap chunkTracker;
AppShufflePartitionInfo(
AppShuffleId appShuffleId,
int reduceId,
File dataFile,
File indexFile,
File metaFile) throws IOException {
this.appShuffleId = Preconditions.checkNotNull(appShuffleId, "app shuffle id");
this.reduceId = reduceId;
this.dataChannel = new FileOutputStream(dataFile).getChannel();
this.indexFile = new RandomAccessFile(indexFile, "rw");
this.metaFile = new RandomAccessFile(metaFile, "rw");
this.currentMapIndex = -1;
// Writing 0 offset so that we can reuse ShuffleIndexInformation.getIndex()
updateChunkInfo(0L, -1);
this.position = 0;
this.encounteredFailure = false;
this.mapTracker = new RoaringBitmap();
this.chunkTracker = new RoaringBitmap();
}
public long getPosition() {
return position;
}
public void setPosition(long position) {
logger.trace("{} shuffleId {} reduceId {} current pos {} update pos {}", appShuffleId.appId,
appShuffleId.shuffleId, reduceId, this.position, position);
this.position = position;
}
boolean isEncounteredFailure() {
return encounteredFailure;
}
void setEncounteredFailure(boolean encounteredFailure) {
this.encounteredFailure = encounteredFailure;
}
int getCurrentMapIndex() {
return currentMapIndex;
}
void setCurrentMapIndex(int mapIndex) {
logger.trace("{} shuffleId {} reduceId {} updated mapIndex {} current mapIndex {}",
appShuffleId.appId, appShuffleId.shuffleId, reduceId, currentMapIndex, mapIndex);
this.currentMapIndex = mapIndex;
}
long getLastChunkOffset() {
return lastChunkOffset;
}
void blockMerged(int mapIndex) {
logger.debug("{} shuffleId {} reduceId {} updated merging mapIndex {}", appShuffleId.appId,
appShuffleId.shuffleId, reduceId, mapIndex);
mapTracker.add(mapIndex);
chunkTracker.add(mapIndex);
lastMergedMapIndex = mapIndex;
}
void resetChunkTracker() {
chunkTracker.clear();
}
/**
* Appends the chunk offset to the index file and adds the map index to the chunk tracker.
*
* @param chunkOffset the offset of the chunk in the data file.
* @param mapIndex the map index to be added to chunk tracker.
*/
void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
long idxStartPos = -1;
try {
// update the chunk tracker to meta file before index file
writeChunkTracker(mapIndex);
idxStartPos = indexFile.getFilePointer();
logger.trace("{} shuffleId {} reduceId {} updated index current {} updated {}",
appShuffleId.appId, appShuffleId.shuffleId, reduceId, this.lastChunkOffset,
chunkOffset);
indexFile.writeLong(chunkOffset);
} catch (IOException ioe) {
if (idxStartPos != -1) {
// reset the position to avoid corrupting index files during exception.
logger.warn("{} shuffleId {} reduceId {} reset index to position {}",
appShuffleId.appId, appShuffleId.shuffleId, reduceId, idxStartPos);
indexFile.seek(idxStartPos);
}
throw ioe;
}
this.lastChunkOffset = chunkOffset;
}
private void writeChunkTracker(int mapIndex) throws IOException {
if (mapIndex == -1) {
return;
}
chunkTracker.add(mapIndex);
long metaStartPos = metaFile.getFilePointer();
try {
logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to meta file",
appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex);
chunkTracker.serialize(metaFile);
} catch (IOException ioe) {
logger.warn("{} shuffleId {} reduceId {} mapIndex {} reset position of meta file to {}",
appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex, metaStartPos);
metaFile.seek(metaStartPos);
throw ioe;
}
}
void closeAllFiles() {
if (dataChannel != null) {
try {
dataChannel.close();
} catch (IOException ioe) {
logger.warn("Error closing data channel for {} shuffleId {} reduceId {}",
appShuffleId.appId, appShuffleId.shuffleId, reduceId);
} finally {
dataChannel = null;
}
}
if (metaFile != null) {
try {
// if the stream is closed, channel get's closed as well.
metaFile.close();
} catch (IOException ioe) {
logger.warn("Error closing meta file for {} shuffleId {} reduceId {}",
appShuffleId.appId, appShuffleId.shuffleId, reduceId);
} finally {
metaFile = null;
}
}
if (indexFile != null) {
try {
indexFile.close();
} catch (IOException ioe) {
logger.warn("Error closing index file for {} shuffleId {} reduceId {}",
appShuffleId.appId, appShuffleId.shuffleId, reduceId);
} finally {
indexFile = null;
}
}
}
@Override
protected void finalize() throws Throwable {
closeAllFiles();
}
}
/**
* Wraps all the information related to the merge directory of an application.
*/
private static class AppPathsInfo {
private final String[] activeLocalDirs;
private final int subDirsPerLocalDir;
private AppPathsInfo(
String appId,
String[] localDirs,
int subDirsPerLocalDir) {
activeLocalDirs = Arrays.stream(localDirs)
.map(localDir ->
// Merge directory is created at the same level as block-manager directory. The list of
// local directories that we get from executorShuffleInfo are paths of each
// block-manager directory. To find out the merge directory location, we first find the
// parent dir and then append the "merger_manager" directory to it.
Paths.get(localDir).getParent().resolve(MERGE_MANAGER_DIR).toFile().getPath())
.toArray(String[]::new);
this.subDirsPerLocalDir = subDirsPerLocalDir;
if (logger.isInfoEnabled()) {
logger.info("Updated active local dirs {} and sub dirs {} for application {}",
Arrays.toString(activeLocalDirs),subDirsPerLocalDir, appId);
}
}
}
}

View file

@ -25,6 +25,8 @@ import org.apache.spark.network.protocol.Encoders;
/**
* Request to finalize merge for a given shuffle.
* Returns {@link MergeStatuses}
*
* @since 3.1.0
*/
public class FinalizeShuffleMerge extends BlockTransferMessage {
public final String appId;

View file

@ -32,6 +32,8 @@ import org.apache.spark.network.protocol.Encoders;
* the set of mapper partition blocks that are merged for a given reducer partition, an array
* of reducer IDs, and an array of merged shuffle partition sizes. The 3 arrays list information
* about all the reducer partitions merged by the ExternalShuffleService in the same order.
*
* @since 3.1.0
*/
public class MergeStatuses extends BlockTransferMessage {
/** Shuffle ID **/

View file

@ -23,23 +23,27 @@ import io.netty.buffer.ByteBuf;
import org.apache.spark.network.protocol.Encoders;
// Needed by ScalaDoc. See SPARK-7726
import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
/**
* Request to push a block to a remote shuffle service to be merged in push based shuffle.
* The remote shuffle service will also include this message when responding the push requests.
*
* @since 3.1.0
*/
public class PushBlockStream extends BlockTransferMessage {
public final String appId;
public final String blockId;
public final int shuffleId;
public final int mapIndex;
public final int reduceId;
// Similar to the chunkIndex in StreamChunkId, indicating the index of a block in a batch of
// blocks to be pushed.
public final int index;
public PushBlockStream(String appId, String blockId, int index) {
public PushBlockStream(String appId, int shuffleId, int mapIndex, int reduceId, int index) {
this.appId = appId;
this.blockId = blockId;
this.shuffleId = shuffleId;
this.mapIndex = mapIndex;
this.reduceId = reduceId;
this.index = index;
}
@ -50,14 +54,16 @@ public class PushBlockStream extends BlockTransferMessage {
@Override
public int hashCode() {
return Objects.hashCode(appId, blockId, index);
return Objects.hashCode(appId, shuffleId, mapIndex , reduceId, index);
}
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("appId", appId)
.add("blockId", blockId)
.add("shuffleId", shuffleId)
.add("mapIndex", mapIndex)
.add("reduceId", reduceId)
.add("index", index)
.toString();
}
@ -67,7 +73,9 @@ public class PushBlockStream extends BlockTransferMessage {
if (other != null && other instanceof PushBlockStream) {
PushBlockStream o = (PushBlockStream) other;
return Objects.equal(appId, o.appId)
&& Objects.equal(blockId, o.blockId)
&& shuffleId == o.shuffleId
&& mapIndex == o.mapIndex
&& reduceId == o.reduceId
&& index == o.index;
}
return false;
@ -75,21 +83,24 @@ public class PushBlockStream extends BlockTransferMessage {
@Override
public int encodedLength() {
return Encoders.Strings.encodedLength(appId)
+ Encoders.Strings.encodedLength(blockId) + 4;
return Encoders.Strings.encodedLength(appId) + 16;
}
@Override
public void encode(ByteBuf buf) {
Encoders.Strings.encode(buf, appId);
Encoders.Strings.encode(buf, blockId);
buf.writeInt(shuffleId);
buf.writeInt(mapIndex);
buf.writeInt(reduceId);
buf.writeInt(index);
}
public static PushBlockStream decode(ByteBuf buf) {
String appId = Encoders.Strings.decode(buf);
String blockId = Encoders.Strings.decode(buf);
int shuffleId = buf.readInt();
int mapIdx = buf.readInt();
int reduceId = buf.readInt();
int index = buf.readInt();
return new PushBlockStream(appId, blockId, index);
return new PushBlockStream(appId, shuffleId, mapIdx, reduceId, index);
}
}

View file

@ -77,7 +77,7 @@ public class ExternalBlockHandlerSuite {
ByteBuffer registerMessage = new RegisterExecutor("app0", "exec1", config).toByteBuffer();
handler.receive(client, registerMessage, callback);
verify(blockResolver, times(1)).registerExecutor("app0", "exec1", config);
verify(mergedShuffleManager, times(1)).registerExecutor("app0", localDirs);
verify(mergedShuffleManager, times(1)).registerExecutor("app0", config);
verify(callback, times(1)).onSuccess(any(ByteBuffer.class));
verify(callback, never()).onFailure(any(Throwable.class));

View file

@ -45,77 +45,77 @@ public class OneForOneBlockPusherSuite {
@Test
public void testPushOne() {
LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
blocks.put("shuffle_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[1])));
blocks.put("shufflePush_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[1])));
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
BlockFetchingListener listener = pushBlocks(
blocks,
blockIds,
Arrays.asList(new PushBlockStream("app-id", "shuffle_0_0_0", 0)));
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0)));
verify(listener).onBlockFetchSuccess(eq("shuffle_0_0_0"), any());
verify(listener).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
}
@Test
public void testPushThree() {
LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
blocks.put("b1", new NioManagedBuffer(ByteBuffer.wrap(new byte[23])));
blocks.put("b2", new NettyManagedBuffer(Unpooled.wrappedBuffer(new byte[23])));
blocks.put("shufflePush_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
blocks.put("shufflePush_0_1_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[23])));
blocks.put("shufflePush_0_2_0", new NettyManagedBuffer(Unpooled.wrappedBuffer(new byte[23])));
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
BlockFetchingListener listener = pushBlocks(
blocks,
blockIds,
Arrays.asList(new PushBlockStream("app-id", "b0", 0),
new PushBlockStream("app-id", "b1", 1),
new PushBlockStream("app-id", "b2", 2)));
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
new PushBlockStream("app-id", 0, 1, 0, 1),
new PushBlockStream("app-id", 0, 2, 0, 2)));
for (int i = 0; i < 3; i ++) {
verify(listener, times(1)).onBlockFetchSuccess(eq("b" + i), any());
}
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_1_0"), any());
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_2_0"), any());
}
@Test
public void testServerFailures() {
LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
blocks.put("b1", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
blocks.put("b2", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
blocks.put("shufflePush_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
blocks.put("shufflePush_0_1_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
blocks.put("shufflePush_0_2_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
BlockFetchingListener listener = pushBlocks(
blocks,
blockIds,
Arrays.asList(new PushBlockStream("app-id", "b0", 0),
new PushBlockStream("app-id", "b1", 1),
new PushBlockStream("app-id", "b2", 2)));
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
new PushBlockStream("app-id", 0, 1, 0, 1),
new PushBlockStream("app-id", 0, 2, 0, 2)));
verify(listener, times(1)).onBlockFetchSuccess(eq("b0"), any());
verify(listener, times(1)).onBlockFetchFailure(eq("b1"), any());
verify(listener, times(1)).onBlockFetchFailure(eq("b2"), any());
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_1_0"), any());
verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_2_0"), any());
}
@Test
public void testHandlingRetriableFailures() {
LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
blocks.put("b1", null);
blocks.put("b2", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
blocks.put("shufflePush_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
blocks.put("shufflePush_0_1_0", null);
blocks.put("shufflePush_0_2_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
BlockFetchingListener listener = pushBlocks(
blocks,
blockIds,
Arrays.asList(new PushBlockStream("app-id", "b0", 0),
new PushBlockStream("app-id", "b1", 1),
new PushBlockStream("app-id", "b2", 2)));
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
new PushBlockStream("app-id", 0, 1, 0, 1),
new PushBlockStream("app-id", 0, 2, 0, 2)));
verify(listener, times(1)).onBlockFetchSuccess(eq("b0"), any());
verify(listener, times(0)).onBlockFetchSuccess(not(eq("b0")), any());
verify(listener, times(0)).onBlockFetchFailure(eq("b0"), any());
verify(listener, times(1)).onBlockFetchFailure(eq("b1"), any());
verify(listener, times(2)).onBlockFetchFailure(eq("b2"), any());
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
verify(listener, times(0)).onBlockFetchSuccess(not(eq("shufflePush_0_0_0")), any());
verify(listener, times(0)).onBlockFetchFailure(eq("shufflePush_0_0_0"), any());
verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_1_0"), any());
verify(listener, times(2)).onBlockFetchFailure(eq("shufflePush_0_2_0"), any());
}
/**

View file

@ -0,0 +1,496 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.shuffle;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.concurrent.Semaphore;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
/**
* Tests for {@link RemoteBlockPushResolver}.
*/
public class RemoteBlockPushResolverSuite {
private static final Logger log = LoggerFactory.getLogger(RemoteBlockPushResolverSuite.class);
private final String TEST_APP = "testApp";
private final String BLOCK_MANAGER_DIR = "blockmgr-193d8401";
private TransportConf conf;
private RemoteBlockPushResolver pushResolver;
private Path[] localDirs;
@Before
public void before() throws IOException {
localDirs = createLocalDirs(2);
MapConfigProvider provider = new MapConfigProvider(
ImmutableMap.of("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "4"));
conf = new TransportConf("shuffle", provider);
pushResolver = new RemoteBlockPushResolver(conf);
registerExecutor(TEST_APP, prepareLocalDirs(localDirs));
}
@After
public void after() {
try {
for (Path local : localDirs) {
FileUtils.deleteDirectory(local.toFile());
}
removeApplication(TEST_APP);
} catch (Exception e) {
// don't fail if clean up doesn't succeed.
log.debug("Error while tearing down", e);
}
}
@Test(expected = RuntimeException.class)
public void testNoIndexFile() {
try {
pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
} catch (Throwable t) {
assertTrue(t.getMessage().startsWith("Merged shuffle index file"));
Throwables.propagate(t);
}
}
@Test
public void testBasicBlockMerge() throws IOException {
PushBlock[] pushBlocks = new PushBlock[] {
new PushBlock(0, 0, 0, ByteBuffer.wrap(new byte[4])),
new PushBlock(0, 1, 0, ByteBuffer.wrap(new byte[5]))
};
pushBlockHelper(TEST_APP, pushBlocks);
MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
new FinalizeShuffleMerge(TEST_APP, 0));
validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4, 5}, new int[][]{{0}, {1}});
}
@Test
public void testDividingMergedBlocksIntoChunks() throws IOException {
PushBlock[] pushBlocks = new PushBlock[] {
new PushBlock(0, 0, 0, ByteBuffer.wrap(new byte[2])),
new PushBlock(0, 1, 0, ByteBuffer.wrap(new byte[3])),
new PushBlock(0, 2, 0, ByteBuffer.wrap(new byte[5])),
new PushBlock(0, 3, 0, ByteBuffer.wrap(new byte[3]))
};
pushBlockHelper(TEST_APP, pushBlocks);
MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
new FinalizeShuffleMerge(TEST_APP, 0));
validateMergeStatuses(statuses, new int[] {0}, new long[] {13});
MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
validateChunks(TEST_APP, 0, 0, meta, new int[]{5, 5, 3}, new int[][]{{0, 1}, {2}, {3}});
}
@Test
public void testFinalizeWithMultipleReducePartitions() throws IOException {
PushBlock[] pushBlocks = new PushBlock[] {
new PushBlock(0, 0, 0, ByteBuffer.wrap(new byte[2])),
new PushBlock(0, 1, 0, ByteBuffer.wrap(new byte[3])),
new PushBlock(0, 0, 1, ByteBuffer.wrap(new byte[5])),
new PushBlock(0, 1, 1, ByteBuffer.wrap(new byte[3]))
};
pushBlockHelper(TEST_APP, pushBlocks);
MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
new FinalizeShuffleMerge(TEST_APP, 0));
validateMergeStatuses(statuses, new int[] {0, 1}, new long[] {5, 8});
MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
validateChunks(TEST_APP, 0, 0, meta, new int[]{5}, new int[][]{{0, 1}});
}
@Test
public void testDeferredBufsAreWrittenDuringOnData() throws IOException {
StreamCallbackWithID stream1 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
StreamCallbackWithID stream2 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0));
// This should be deferred
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
// stream 1 now completes
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
stream1.onComplete(stream1.getID());
// stream 2 has more data and then completes
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
stream2.onComplete(stream2.getID());
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4, 6}, new int[][]{{0}, {1}});
}
@Test
public void testDeferredBufsAreWrittenDuringOnComplete() throws IOException {
StreamCallbackWithID stream1 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
StreamCallbackWithID stream2 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0));
// This should be deferred
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
// stream 1 now completes
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
stream1.onComplete(stream1.getID());
// stream 2 now completes completes
stream2.onComplete(stream2.getID());
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4, 6}, new int[][]{{0}, {1}});
}
@Test
public void testDuplicateBlocksAreIgnoredWhenPrevStreamHasCompleted() throws IOException {
StreamCallbackWithID stream1 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
stream1.onComplete(stream1.getID());
StreamCallbackWithID stream2 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
// This should be ignored
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
stream2.onComplete(stream2.getID());
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
}
@Test
public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOException {
StreamCallbackWithID stream1 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
StreamCallbackWithID stream2 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
// This should be ignored
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
// stream 1 now completes
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
stream1.onComplete(stream1.getID());
// stream 2 now completes completes
stream2.onComplete(stream2.getID());
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
}
@Test
public void testFailureAfterData() throws IOException {
StreamCallbackWithID stream =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
assertEquals("num-chunks", 0, blockMeta.getNumChunks());
}
@Test
public void testFailureAfterMultipleDataBlocks() throws IOException {
StreamCallbackWithID stream =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[2]));
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[3]));
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
assertEquals("num-chunks", 0, blockMeta.getNumChunks());
}
@Test
public void testFailureAfterComplete() throws IOException {
StreamCallbackWithID stream =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[2]));
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[3]));
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
stream.onComplete(stream.getID());
stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}});
}
@Test (expected = RuntimeException.class)
public void testTooLateArrival() throws IOException {
ByteBuffer[] blocks = new ByteBuffer[]{
ByteBuffer.wrap(new byte[4]),
ByteBuffer.wrap(new byte[5])
};
StreamCallbackWithID stream = pushResolver.receiveBlockDataAsStream(
new PushBlockStream(TEST_APP, 0, 0, 0, 0));
for (ByteBuffer block : blocks) {
stream.onData(stream.getID(), block);
}
stream.onComplete(stream.getID());
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
new PushBlockStream(TEST_APP, 0, 1, 0, 0));
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4]));
try {
stream1.onComplete(stream1.getID());
} catch (RuntimeException re) {
assertEquals(
"Block shufflePush_0_1_0 received after merged shuffle is finalized",
re.getMessage());
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}});
throw re;
}
}
@Test
public void testIncompleteStreamsAreOverwritten() throws IOException {
registerExecutor(TEST_APP, prepareLocalDirs(localDirs));
StreamCallbackWithID stream1 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4]));
// There is a failure
stream1.onFailure(stream1.getID(), new RuntimeException("forced error"));
StreamCallbackWithID stream2 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0));
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5]));
stream2.onComplete(stream2.getID());
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{5}, new int[][]{{1}});
}
@Test (expected = RuntimeException.class)
public void testCollision() throws IOException {
StreamCallbackWithID stream1 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
StreamCallbackWithID stream2 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0));
// This should be deferred
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5]));
// Since stream2 didn't get any opportunity it will throw couldn't find opportunity error
try {
stream2.onComplete(stream2.getID());
} catch (RuntimeException re) {
assertEquals(
"Couldn't find an opportunity to write block shufflePush_0_1_0 to merged shuffle",
re.getMessage());
throw re;
}
}
@Test (expected = RuntimeException.class)
public void testFailureInAStreamDoesNotInterfereWithStreamWhichIsWriting() throws IOException {
StreamCallbackWithID stream1 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
StreamCallbackWithID stream2 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0));
// There is a failure with stream2
stream2.onFailure(stream2.getID(), new RuntimeException("forced error"));
StreamCallbackWithID stream3 =
pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 2, 0, 0));
// This should be deferred
stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[5]));
// Since this stream didn't get any opportunity it will throw couldn't find opportunity error
RuntimeException failedEx = null;
try {
stream3.onComplete(stream3.getID());
} catch (RuntimeException re) {
assertEquals(
"Couldn't find an opportunity to write block shufflePush_0_2_0 to merged shuffle",
re.getMessage());
failedEx = re;
}
// stream 1 now completes
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
stream1.onComplete(stream1.getID());
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4}, new int[][] {{0}});
if (failedEx != null) {
throw failedEx;
}
}
@Test(expected = NullPointerException.class)
public void testUpdateLocalDirsOnlyOnce() throws IOException {
String testApp = "updateLocalDirsOnlyOnceTest";
Path[] activeLocalDirs = createLocalDirs(1);
registerExecutor(testApp, prepareLocalDirs(activeLocalDirs));
assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1);
assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains(
activeLocalDirs[0].toFile().getPath()));
// Any later executor register from the same application should not change the active local
// dirs list
Path[] updatedLocalDirs = localDirs;
registerExecutor(testApp, prepareLocalDirs(updatedLocalDirs));
assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1);
assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains(
activeLocalDirs[0].toFile().getPath()));
removeApplication(testApp);
try {
pushResolver.getMergedBlockDirs(testApp);
} catch (Throwable e) {
assertTrue(e.getMessage()
.startsWith("application " + testApp + " is not registered or NM was restarted."));
Throwables.propagate(e);
}
}
@Test
public void testCleanUpDirectory() throws IOException, InterruptedException {
String testApp = "cleanUpDirectory";
Semaphore deleted = new Semaphore(0);
pushResolver = new RemoteBlockPushResolver(conf) {
@Override
void deleteExecutorDirs(Path[] dirs) {
super.deleteExecutorDirs(dirs);
deleted.release();
}
};
Path[] activeDirs = createLocalDirs(1);
registerExecutor(testApp, prepareLocalDirs(activeDirs));
PushBlock[] pushBlocks = new PushBlock[] {
new PushBlock(0, 0, 0, ByteBuffer.wrap(new byte[4]))};
pushBlockHelper(testApp, pushBlocks);
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(testApp, 0, 0);
validateChunks(testApp, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
String[] mergeDirs = pushResolver.getMergedBlockDirs(testApp);
pushResolver.applicationRemoved(testApp, true);
// Since the cleanup happen in a different thread, check few times to see if the merge dirs gets
// deleted.
deleted.acquire();
for (String mergeDir : mergeDirs) {
Assert.assertFalse(Files.exists(Paths.get(mergeDir)));
}
}
private Path[] createLocalDirs(int numLocalDirs) throws IOException {
Path[] localDirs = new Path[numLocalDirs];
for (int i = 0; i < localDirs.length; i++) {
localDirs[i] = Files.createTempDirectory("shuffleMerge");
localDirs[i].toFile().deleteOnExit();
}
return localDirs;
}
private void registerExecutor(String appId, String[] localDirs) throws IOException {
ExecutorShuffleInfo shuffleInfo = new ExecutorShuffleInfo(localDirs, 1, "mergedShuffle");
pushResolver.registerExecutor(appId, shuffleInfo);
}
private String[] prepareLocalDirs(Path[] localDirs) throws IOException {
String[] blockMgrDirs = new String[localDirs.length];
for (int i = 0; i< localDirs.length; i++) {
Files.createDirectories(localDirs[i].resolve(
RemoteBlockPushResolver.MERGE_MANAGER_DIR + File.separator + "00"));
blockMgrDirs[i] = localDirs[i].toFile().getPath() + File.separator + BLOCK_MANAGER_DIR;
}
return blockMgrDirs;
}
private void removeApplication(String appId) {
// PushResolver cleans up the local dirs in a different thread which can conflict with the test
// data of other tests, since they are using the same Application Id.
pushResolver.applicationRemoved(appId, false);
}
private void validateMergeStatuses(
MergeStatuses mergeStatuses,
int[] expectedReduceIds,
long[] expectedSizes) {
assertArrayEquals(expectedReduceIds, mergeStatuses.reduceIds);
assertArrayEquals(expectedSizes, mergeStatuses.sizes);
}
private void validateChunks(
String appId,
int shuffleId,
int reduceId,
MergedBlockMeta meta,
int[] expectedSizes,
int[][] expectedMapsPerChunk) throws IOException {
assertEquals("num chunks", expectedSizes.length, meta.getNumChunks());
RoaringBitmap[] bitmaps = meta.readChunkBitmaps();
assertEquals("num of bitmaps", meta.getNumChunks(), bitmaps.length);
for (int i = 0; i < meta.getNumChunks(); i++) {
RoaringBitmap chunkBitmap = bitmaps[i];
Arrays.stream(expectedMapsPerChunk[i]).forEach(x -> assertTrue(chunkBitmap.contains(x)));
}
for (int i = 0; i < meta.getNumChunks(); i++) {
FileSegmentManagedBuffer mb =
(FileSegmentManagedBuffer) pushResolver.getMergedBlockData(appId, shuffleId, reduceId, i);
assertEquals(expectedSizes[i], mb.getLength());
}
}
private void pushBlockHelper(
String appId,
PushBlock[] blocks) throws IOException {
for (int i = 0; i < blocks.length; i++) {
StreamCallbackWithID stream = pushResolver.receiveBlockDataAsStream(
new PushBlockStream(appId, blocks[i].shuffleId, blocks[i].mapIndex, blocks[i].reduceId, 0));
stream.onData(stream.getID(), blocks[i].buffer);
stream.onComplete(stream.getID());
}
}
private static class PushBlock {
private final int shuffleId;
private final int mapIndex;
private final int reduceId;
private final ByteBuffer buffer;
PushBlock(int shuffleId, int mapIndex, int reduceId, ByteBuffer buffer) {
this.shuffleId = shuffleId;
this.mapIndex = mapIndex;
this.reduceId = reduceId;
this.buffer = buffer;
}
}
}

View file

@ -41,6 +41,7 @@ import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.*;
import org.apache.spark.network.shuffle.MergedShuffleFileManager;
import org.apache.spark.network.util.LevelDBProvider;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
@ -172,7 +173,10 @@ public class YarnShuffleService extends AuxiliaryService {
}
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
blockHandler = new ExternalBlockHandler(transportConf, registeredExecutorFile);
MergedShuffleFileManager shuffleMergeManager = newMergedShuffleFileManagerInstance(
transportConf);
blockHandler = new ExternalBlockHandler(
transportConf, registeredExecutorFile, shuffleMergeManager);
// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
@ -219,6 +223,23 @@ public class YarnShuffleService extends AuxiliaryService {
}
}
@VisibleForTesting
static MergedShuffleFileManager newMergedShuffleFileManagerInstance(TransportConf conf) {
String mergeManagerImplClassName = conf.mergedShuffleFileManagerImpl();
try {
Class<?> mergeManagerImplClazz = Class.forName(
mergeManagerImplClassName, true, Thread.currentThread().getContextClassLoader());
Class<? extends MergedShuffleFileManager> mergeManagerSubClazz =
mergeManagerImplClazz.asSubclass(MergedShuffleFileManager.class);
// The assumption is that all the custom implementations just like the RemoteBlockPushResolver
// will also need the transport configuration.
return mergeManagerSubClazz.getConstructor(TransportConf.class).newInstance(conf);
} catch (Exception e) {
logger.error("Unable to create an instance of {}", mergeManagerImplClassName);
return new ExternalBlockHandler.NoOpMergedShuffleFileManager(conf);
}
}
private void loadSecretsFromDb() throws IOException {
secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);

View file

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.yarn;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import org.apache.spark.network.shuffle.ExternalBlockHandler;
import org.apache.spark.network.shuffle.MergedShuffleFileManager;
import org.apache.spark.network.shuffle.RemoteBlockPushResolver;
import org.apache.spark.network.util.TransportConf;
public class YarnShuffleServiceSuite {
@Test
public void testCreateDefaultMergedShuffleFileManagerInstance() {
TransportConf mockConf = mock(TransportConf.class);
when(mockConf.mergedShuffleFileManagerImpl()).thenReturn(
"org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager");
MergedShuffleFileManager mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(
mockConf);
assertTrue(mergeMgr instanceof ExternalBlockHandler.NoOpMergedShuffleFileManager);
}
@Test
public void testCreateRemoteBlockPushResolverInstance() {
TransportConf mockConf = mock(TransportConf.class);
when(mockConf.mergedShuffleFileManagerImpl()).thenReturn(
"org.apache.spark.network.shuffle.RemoteBlockPushResolver");
MergedShuffleFileManager mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(
mockConf);
assertTrue(mergeMgr instanceof RemoteBlockPushResolver);
}
@Test
public void testInvalidClassNameOfMergeManagerWillUseNoOpInstance() {
TransportConf mockConf = mock(TransportConf.class);
when(mockConf.mergedShuffleFileManagerImpl()).thenReturn(
"org.apache.spark.network.shuffle.NotExistent");
MergedShuffleFileManager mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(
mockConf);
assertTrue(mergeMgr instanceof ExternalBlockHandler.NoOpMergedShuffleFileManager);
}
}