[SPARK-15074][SHUFFLE] Cache shuffle index file to speedup shuffle fetch

## What changes were proposed in this pull request?

Shuffle fetch on large intermediate dataset is slow because the shuffle service open/close the index file for each shuffle fetch. This change introduces a cache for the index information so that we can avoid accessing the index files for each block fetch

## How was this patch tested?

Tested by running a job on the cluster and the shuffle read time was reduced by 50%.

Author: Sital Kedia <skedia@fb.com>

Closes #12944 from sitalkedia/shuffle_service.
This commit is contained in:
Sital Kedia 2016-08-04 14:54:38 -07:00 committed by Josh Rosen
parent 0e2e5d7d0b
commit 9c15d079df
5 changed files with 138 additions and 12 deletions

View file

@ -60,6 +60,10 @@ public class TransportConf {
SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");
}
public int getInt(String name, int defaultValue) {
return conf.getInt(name, defaultValue);
}
private String getConfKey(String suffix) {
return "spark." + module + "." + suffix;
}

View file

@ -21,6 +21,7 @@ import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@ -29,6 +30,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB;
@ -66,6 +70,12 @@ public class ExternalShuffleBlockResolver {
@VisibleForTesting
final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
/**
* Caches index file information so that we can avoid open/close the index files
* for each block fetch.
*/
private final LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache;
// Single-threaded Java executor used to perform expensive recursive directory deletion.
private final Executor directoryCleaner;
@ -95,6 +105,15 @@ public class ExternalShuffleBlockResolver {
Executor directoryCleaner) throws IOException {
this.conf = conf;
this.registeredExecutorFile = registeredExecutorFile;
int indexCacheEntries = conf.getInt("spark.shuffle.service.index.cache.entries", 1024);
CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
new CacheLoader<File, ShuffleIndexInformation>() {
public ShuffleIndexInformation load(File file) throws IOException {
return new ShuffleIndexInformation(file);
}
};
shuffleIndexCache = CacheBuilder.newBuilder()
.maximumSize(indexCacheEntries).build(indexCacheLoader);
if (registeredExecutorFile != null) {
Options options = new Options();
options.createIfMissing(false);
@ -265,24 +284,17 @@ public class ExternalShuffleBlockResolver {
File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
DataInputStream in = null;
try {
in = new DataInputStream(new FileInputStream(indexFile));
in.skipBytes(reduceId * 8);
long offset = in.readLong();
long nextOffset = in.readLong();
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
return new FileSegmentManagedBuffer(
conf,
getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
offset,
nextOffset - offset);
} catch (IOException e) {
shuffleIndexRecord.getOffset(),
shuffleIndexRecord.getLength());
} catch (ExecutionException e) {
throw new RuntimeException("Failed to open file: " + indexFile, e);
} finally {
if (in != null) {
JavaUtils.closeQuietly(in);
}
}
}

View file

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.shuffle;
import com.google.common.cache.LoadingCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.nio.ch.IOUtil;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.LongBuffer;
/**
* Keeps the index information for a particular map output
* as an in-memory LongBuffer.
*/
public class ShuffleIndexInformation {
/** offsets as long buffer */
private final LongBuffer offsets;
public ShuffleIndexInformation(File indexFile) throws IOException {
int size = (int)indexFile.length();
ByteBuffer buffer = ByteBuffer.allocate(size);
offsets = buffer.asLongBuffer();
DataInputStream dis = null;
try {
dis = new DataInputStream(new FileInputStream(indexFile));
dis.readFully(buffer.array());
} finally {
if (dis != null) {
dis.close();
}
}
}
/**
* Get index offset for a particular reducer.
*/
public ShuffleIndexRecord getIndex(int reduceId) {
long offset = offsets.get(reduceId);
long nextOffset = offsets.get(reduceId + 1);
return new ShuffleIndexRecord(offset, nextOffset - offset);
}
}

View file

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.shuffle;
/**
* Contains offset and length of the shuffle block data.
*/
public class ShuffleIndexRecord {
private final long offset;
private final long length;
public ShuffleIndexRecord(long offset, long length) {
this.offset = offset;
this.length = length;
}
public long getOffset() {
return offset;
}
public long getLength() {
return length;
}
}

View file

@ -521,6 +521,13 @@ Apart from these, the following properties are also available, and may be useful
Port on which the external shuffle service will run.
</td>
</tr>
<tr>
<td><code>spark.shuffle.service.index.cache.entries</code></td>
<td>1024</td>
<td>
Max number of entries to keep in the index cache of the shuffle service.
</td>
</tr>
<tr>
<td><code>spark.shuffle.sort.bypassMergeThreshold</code></td>
<td>200</td>