[SPARK-32149][SHUFFLE] Improve file path name normalisation at block resolution within the external shuffle service

### What changes were proposed in this pull request?

Improving file path name normalisation by removing the approximate transformation from Spark and using the path normalization from the JDK.

### Why are the changes needed?

In the external shuffle service during the block resolution the file paths (for disk persisted RDD and for shuffle blocks) are normalized by a custom Spark code which uses an OS dependent regexp. This is a redundant code of the package-private JDK counterpart. As the code not a perfect match even it could happen one method results in a bit different (but semantically equal) path.

The reason of this redundant transformation is the interning of the normalized path to save some heap here which is only possible if both transformations results in the same string.

Checking the JDK code I believe there is a better solution which is perfect match for the JDK code as it uses that package private method. Moreover based on some benchmarking even this new method seams to be more performant too.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

As we are reusing the JDK code for normalisation no test is needed. Even the existing test can be removed.

But in a separate branch I have created a benchmark where the performance of the old and the new solution can be compared. It shows the new method is about 7-10 times better than old one.

Closes #28967 from attilapiros/SPARK-32149.

Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
This commit is contained in:
“attilapiros” 2020-07-11 22:55:26 +09:00 committed by Jungtaek Lim (HeartSaVioR)
parent 0c9196e549
commit 1b3fc9a111
3 changed files with 10 additions and 72 deletions

View file

@ -18,25 +18,11 @@
package org.apache.spark.network.shuffle;
import java.io.File;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.SystemUtils;
import org.apache.spark.network.util.JavaUtils;
public class ExecutorDiskUtils {
private static final Pattern MULTIPLE_SEPARATORS;
static {
if (SystemUtils.IS_OS_WINDOWS) {
MULTIPLE_SEPARATORS = Pattern.compile("[/\\\\]+");
} else {
MULTIPLE_SEPARATORS = Pattern.compile("/{2,}");
}
}
/**
* Hashes a filename into the corresponding local directory, in a manner consistent with
* Spark's DiskBlockManager.getFile().
@ -45,34 +31,16 @@ public class ExecutorDiskUtils {
int hash = JavaUtils.nonNegativeHash(filename);
String localDir = localDirs[hash % localDirs.length];
int subDirId = (hash / localDirs.length) % subDirsPerLocalDir;
return new File(createNormalizedInternedPathname(
localDir, String.format("%02x", subDirId), filename));
}
/**
* This method is needed to avoid the situation when multiple File instances for the
* same pathname "foo/bar" are created, each with a separate copy of the "foo/bar" String.
* According to measurements, in some scenarios such duplicate strings may waste a lot
* of memory (~ 10% of the heap). To avoid that, we intern the pathname, and before that
* we make sure that it's in a normalized form (contains no "//", "///" etc.) Otherwise,
* the internal code in java.io.File would normalize it later, creating a new "foo/bar"
* String copy. Unfortunately, we cannot just reuse the normalization code that java.io.File
* uses, since it is in the package-private class java.io.FileSystem.
*
* On Windows, separator "\" is used instead of "/".
*
* "\\" is a legal character in path name on Unix-like OS, but illegal on Windows.
*/
@VisibleForTesting
static String createNormalizedInternedPathname(String dir1, String dir2, String fname) {
String pathname = dir1 + File.separator + dir2 + File.separator + fname;
Matcher m = MULTIPLE_SEPARATORS.matcher(pathname);
pathname = m.replaceAll(Matcher.quoteReplacement(File.separator));
// A single trailing slash needs to be taken care of separately
if (pathname.length() > 1 && pathname.charAt(pathname.length() - 1) == File.separatorChar) {
pathname = pathname.substring(0, pathname.length() - 1);
}
return pathname.intern();
final String notNormalizedPath =
localDir + File.separator + String.format("%02x", subDirId) + File.separator + filename;
// Interning the normalized path as according to measurements, in some scenarios such
// duplicate strings may waste a lot of memory (~ 10% of the heap).
// Unfortunately, we cannot just call the normalization code that java.io.File
// uses, since it is in the package-private class java.io.FileSystem.
// So we are creating a File just to get the normalized path back to intern it.
// Finally a new File is built and returned with this interned normalized path.
final String normalizedInternedPath = new File(notNormalizedPath).getPath().intern();
return new File(normalizedInternedPath);
}
}

View file

@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.builder.ToStringBuilder;
@ -71,8 +70,6 @@ public class ExternalShuffleBlockResolver {
private static final String APP_KEY_PREFIX = "AppExecShuffleInfo";
private static final StoreVersion CURRENT_VERSION = new StoreVersion(1, 0);
private static final Pattern MULTIPLE_SEPARATORS = Pattern.compile(File.separator + "{2,}");
// Map containing all registered executors' metadata.
@VisibleForTesting
final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;

View file

@ -17,7 +17,6 @@
package org.apache.spark.network.shuffle;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@ -25,7 +24,6 @@ import java.nio.charset.StandardCharsets;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.CharStreams;
import org.apache.commons.lang3.SystemUtils;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
@ -145,29 +143,4 @@ public class ExternalShuffleBlockResolverSuite {
assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class));
}
@Test
public void testNormalizeAndInternPathname() {
String sep = File.separator;
String expectedPathname = sep + "foo" + sep + "bar" + sep + "baz";
assertPathsMatch("/foo", "bar", "baz", expectedPathname);
assertPathsMatch("//foo/", "bar/", "//baz", expectedPathname);
assertPathsMatch("/foo/", "/bar//", "/baz", expectedPathname);
assertPathsMatch("foo", "bar", "baz///", "foo" + sep + "bar" + sep + "baz");
assertPathsMatch("/", "", "", sep);
assertPathsMatch("/", "/", "/", sep);
if (SystemUtils.IS_OS_WINDOWS) {
assertPathsMatch("/foo\\/", "bar", "baz", expectedPathname);
} else {
assertPathsMatch("/foo\\/", "bar", "baz", sep + "foo\\" + sep + "bar" + sep + "baz");
}
}
private void assertPathsMatch(String p1, String p2, String p3, String expectedPathname) {
String normPathname =
ExecutorDiskUtils.createNormalizedInternedPathname(p1, p2, p3);
assertEquals(expectedPathname, normPathname);
File file = new File(normPathname);
String returnedPath = file.getPath();
assertEquals(normPathname, returnedPath);
}
}