[SPARK-27622][CORE] Avoiding the network when block manager fetches disk persisted RDD blocks from the same host

## What changes were proposed in this pull request?

Before this PR during fetching a disk persisted RDD block the network was always used to get the requested block content even when both the source and fetcher executor was running on the same host.

The idea to access another executor local disk files by directly reading the disk comes from the external shuffle service where the local dirs are stored for each executor (block manager).

To make this possible the following changes are done:
- `RegisterBlockManager` message is extended with the `localDirs` which is stored by the block manager master for each block manager as a new property of the `BlockManagerInfo`
- `GetLocationsAndStatus` is extended with the requester host
- `BlockLocationsAndStatus` (the reply for `GetLocationsAndStatus` message) is extended with the an option of local directories, which is filled with a local directories of a same host executor (if there is any, otherwise None is used). This is where the block content can be read from.

Shuffle blocks are out of scope of this PR: there will be a separate PR opened for that (for another Jira issue).

## How was this patch tested?

With a new unit test in `BlockManagerSuite`. See the the test prefixed by "SPARK-27622: avoid the network when block requested from same host".

Closes #24554 from attilapiros/SPARK-27622.

Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
“attilapiros” 2019-06-25 07:35:44 -07:00 committed by Marcelo Vanzin
parent 1a3858a769
commit b71c130fc6
11 changed files with 342 additions and 93 deletions

View file

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.shuffle;
import java.io.File;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.network.util.JavaUtils;
public class ExecutorDiskUtils {
private static final Pattern MULTIPLE_SEPARATORS = Pattern.compile(File.separator + "{2,}");
/**
* Hashes a filename into the corresponding local directory, in a manner consistent with
* Spark's DiskBlockManager.getFile().
*/
public static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) {
int hash = JavaUtils.nonNegativeHash(filename);
String localDir = localDirs[hash % localDirs.length];
int subDirId = (hash / localDirs.length) % subDirsPerLocalDir;
return new File(createNormalizedInternedPathname(
localDir, String.format("%02x", subDirId), filename));
}
/**
* This method is needed to avoid the situation when multiple File instances for the
* same pathname "foo/bar" are created, each with a separate copy of the "foo/bar" String.
* According to measurements, in some scenarios such duplicate strings may waste a lot
* of memory (~ 10% of the heap). To avoid that, we intern the pathname, and before that
* we make sure that it's in a normalized form (contains no "//", "///" etc.) Otherwise,
* the internal code in java.io.File would normalize it later, creating a new "foo/bar"
* String copy. Unfortunately, we cannot just reuse the normalization code that java.io.File
* uses, since it is in the package-private class java.io.FileSystem.
*/
@VisibleForTesting
static String createNormalizedInternedPathname(String dir1, String dir2, String fname) {
String pathname = dir1 + File.separator + dir2 + File.separator + fname;
Matcher m = MULTIPLE_SEPARATORS.matcher(pathname);
pathname = m.replaceAll("/");
// A single trailing slash needs to be taken care of separately
if (pathname.length() > 1 && pathname.endsWith("/")) {
pathname = pathname.substring(0, pathname.length() - 1);
}
return pathname.intern();
}
}

View file

@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.fasterxml.jackson.annotation.JsonCreator;
@ -298,7 +297,7 @@ public class ExternalShuffleBlockResolver {
*/
private ManagedBuffer getSortBasedShuffleBlockData(
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
File indexFile = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
try {
@ -306,7 +305,7 @@ public class ExternalShuffleBlockResolver {
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
return new FileSegmentManagedBuffer(
conf,
getFile(executor.localDirs, executor.subDirsPerLocalDir,
ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
shuffleIndexRecord.getOffset(),
shuffleIndexRecord.getLength());
@ -317,7 +316,7 @@ public class ExternalShuffleBlockResolver {
public ManagedBuffer getDiskPersistedRddBlockData(
ExecutorShuffleInfo executor, int rddId, int splitIndex) {
File file = getFile(executor.localDirs, executor.subDirsPerLocalDir,
File file = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
"rdd_" + rddId + "_" + splitIndex);
long fileLength = file.length();
ManagedBuffer res = null;
@ -327,19 +326,6 @@ public class ExternalShuffleBlockResolver {
return res;
}
/**
* Hashes a filename into the corresponding local directory, in a manner consistent with
* Spark's DiskBlockManager.getFile().
*/
@VisibleForTesting
static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) {
int hash = JavaUtils.nonNegativeHash(filename);
String localDir = localDirs[hash % localDirs.length];
int subDirId = (hash / localDirs.length) % subDirsPerLocalDir;
return new File(createNormalizedInternedPathname(
localDir, String.format("%02x", subDirId), filename));
}
void close() {
if (db != null) {
try {
@ -350,28 +336,6 @@ public class ExternalShuffleBlockResolver {
}
}
/**
* This method is needed to avoid the situation when multiple File instances for the
* same pathname "foo/bar" are created, each with a separate copy of the "foo/bar" String.
* According to measurements, in some scenarios such duplicate strings may waste a lot
* of memory (~ 10% of the heap). To avoid that, we intern the pathname, and before that
* we make sure that it's in a normalized form (contains no "//", "///" etc.) Otherwise,
* the internal code in java.io.File would normalize it later, creating a new "foo/bar"
* String copy. Unfortunately, we cannot just reuse the normalization code that java.io.File
* uses, since it is in the package-private class java.io.FileSystem.
*/
@VisibleForTesting
static String createNormalizedInternedPathname(String dir1, String dir2, String fname) {
String pathname = dir1 + File.separator + dir2 + File.separator + fname;
Matcher m = MULTIPLE_SEPARATORS.matcher(pathname);
pathname = m.replaceAll("/");
// A single trailing slash needs to be taken care of separately
if (pathname.length() > 1 && pathname.endsWith("/")) {
pathname = pathname.substring(0, pathname.length() - 1);
}
return pathname.intern();
}
public int removeBlocks(String appId, String execId, String[] blockIds) {
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
if (executor == null) {
@ -380,7 +344,8 @@ public class ExternalShuffleBlockResolver {
}
int numRemovedBlocks = 0;
for (String blockId : blockIds) {
File file = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
File file =
ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
if (file.delete()) {
numRemovedBlocks++;
} else {

View file

@ -149,7 +149,7 @@ public class ExternalShuffleBlockResolverSuite {
private void assertPathsMatch(String p1, String p2, String p3, String expectedPathname) {
String normPathname =
ExternalShuffleBlockResolver.createNormalizedInternedPathname(p1, p2, p3);
ExecutorDiskUtils.createNormalizedInternedPathname(p1, p2, p3);
assertEquals(expectedPathname, normPathname);
File file = new File(normPathname);
String returnedPath = file.getPath();

View file

@ -76,9 +76,9 @@ public class TestShuffleDataContext {
try {
dataStream = new FileOutputStream(
ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
indexStream = new DataOutputStream(new FileOutputStream(
ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));
ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));
long offset = 0;
indexStream.writeLong(offset);
@ -121,10 +121,11 @@ public class TestShuffleDataContext {
private void insertFile(String filename, byte[] block) throws IOException {
OutputStream dataStream = null;
File file = ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, filename);
File file = ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename);
assert(!file.exists()) : "this test file has been already generated";
try {
dataStream = new FileOutputStream(file);
dataStream = new FileOutputStream(
ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename));
dataStream.write(block);
} finally {
Closeables.close(dataStream, false);

View file

@ -43,8 +43,9 @@ import org.apache.spark.internal.config.Network
import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.metrics.source.Source
import org.apache.spark.network._
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.client.StreamCallbackWithID
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle._
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.network.util.TransportConf
@ -138,6 +139,8 @@ private[spark] class BlockManager(
private val remoteReadNioBufferConversion =
conf.get(Network.NETWORK_REMOTE_READ_NIO_BUFFER_CONVERSION)
private[spark] val subDirsPerLocalDir = conf.get(config.DISKSTORE_SUB_DIRECTORIES)
val diskBlockManager = {
// Only perform cleanup if an external service is not serving our shuffle files.
val deleteFilesOnStop =
@ -411,6 +414,7 @@ private[spark] class BlockManager(
val idFromMaster = master.registerBlockManager(
id,
diskBlockManager.localDirsString,
maxOnHeapMemory,
maxOffHeapMemory,
slaveEndpoint)
@ -445,7 +449,7 @@ private[spark] class BlockManager(
private def registerWithExternalShuffleServer() {
logInfo("Registering executor with local external shuffle service.")
val shuffleConfig = new ExecutorShuffleInfo(
diskBlockManager.localDirs.map(_.toString),
diskBlockManager.localDirsString,
diskBlockManager.subDirsPerLocalDir,
shuffleManager.getClass.getName)
@ -500,7 +504,8 @@ private[spark] class BlockManager(
def reregister(): Unit = {
// TODO: We might need to rate limit re-registering.
logInfo(s"BlockManager $blockManagerId re-registering with master")
master.registerBlockManager(blockManagerId, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint)
master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory,
maxOffHeapMemory, slaveEndpoint)
reportAllBlocks()
}
@ -827,10 +832,63 @@ private[spark] class BlockManager(
*/
private[spark] def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
val ct = implicitly[ClassTag[T]]
getRemoteManagedBuffer(blockId).map { data =>
getRemoteBlock(blockId, (data: ManagedBuffer) => {
val values =
serializerManager.dataDeserializeStream(blockId, data.createInputStream())(ct)
new BlockResult(values, DataReadMethod.Network, data.size)
})
}
/**
* Get the remote block and transform it to the provided data type.
*
* If the block is persisted to the disk and stored at an executor running on the same host then
* first it is tried to be accessed using the local directories of the other executor directly.
* If the file is successfully identified then tried to be transformed by the provided
* transformation function which expected to open the file. If there is any exception during this
* transformation then block access falls back to fetching it from the remote executor via the
* network.
*
* @param blockId identifies the block to get
* @param bufferTransformer this transformer expected to open the file if the block is backed by a
* file by this it is guaranteed the whole content can be loaded
* @tparam T result type
* @return
*/
private[spark] def getRemoteBlock[T](
blockId: BlockId,
bufferTransformer: ManagedBuffer => T): Option[T] = {
logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
// Because all the remote blocks are registered in driver, it is not necessary to ask
// all the slave executors to get block status.
val locationsAndStatusOption = master.getLocationsAndStatus(blockId, blockManagerId.host)
if (locationsAndStatusOption.isEmpty) {
logDebug(s"Block $blockId is unknown by block manager master")
None
} else {
val locationsAndStatus = locationsAndStatusOption.get
val blockSize = locationsAndStatus.status.diskSize.max(locationsAndStatus.status.memSize)
locationsAndStatus.localDirs.flatMap { localDirs =>
val blockDataOption =
readDiskBlockFromSameHostExecutor(blockId, localDirs, locationsAndStatus.status.diskSize)
val res = blockDataOption.flatMap { blockData =>
try {
Some(bufferTransformer(blockData))
} catch {
case NonFatal(e) =>
logDebug("Block from the same host executor cannot be opened: ", e)
None
}
}
logInfo(s"Read $blockId from the disk of a same host executor is " +
(if (res.isDefined) "successful." else "failed."))
res
}.orElse {
fetchRemoteManagedBuffer(blockId, blockSize, locationsAndStatus).map(bufferTransformer)
}
}
}
@ -861,22 +919,12 @@ private[spark] class BlockManager(
}
/**
* Get block from remote block managers as a ManagedBuffer.
* Fetch the block from remote block managers as a ManagedBuffer.
*/
private def getRemoteManagedBuffer(blockId: BlockId): Option[ManagedBuffer] = {
logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
var runningFailureCount = 0
var totalFailureCount = 0
// Because all the remote blocks are registered in driver, it is not necessary to ask
// all the slave executors to get block status.
val locationsAndStatus = master.getLocationsAndStatus(blockId)
val blockSize = locationsAndStatus.map { b =>
b.status.diskSize.max(b.status.memSize)
}.getOrElse(0L)
val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty)
private def fetchRemoteManagedBuffer(
blockId: BlockId,
blockSize: Long,
locationsAndStatus: BlockManagerMessages.BlockLocationsAndStatus): Option[ManagedBuffer] = {
// If the block size is above the threshold, we should pass our FileManger to
// BlockTransferService, which will leverage it to spill the block; if not, then passed-in
// null value means the block will be persisted in memory.
@ -885,8 +933,9 @@ private[spark] class BlockManager(
} else {
null
}
val locations = sortLocations(blockLocations)
var runningFailureCount = 0
var totalFailureCount = 0
val locations = sortLocations(locationsAndStatus.locations)
val maxFetchFailures = locations.size
var locationIterator = locations.iterator
while (locationIterator.hasNext) {
@ -946,11 +995,37 @@ private[spark] class BlockManager(
None
}
/**
* Reads the block from the local directories of another executor which runs on the same host.
*/
private[spark] def readDiskBlockFromSameHostExecutor(
blockId: BlockId,
localDirs: Array[String],
blockSize: Long): Option[ManagedBuffer] = {
val file = ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId.name)
if (file.exists()) {
val mangedBuffer = securityManager.getIOEncryptionKey() match {
case Some(key) =>
// Encrypted blocks cannot be memory mapped; return a special object that does decryption
// and provides InputStream / FileRegion implementations for reading the data.
new EncryptedManagedBuffer(
new EncryptedBlockData(file, blockSize, conf, key))
case _ =>
val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
}
Some(mangedBuffer)
} else {
None
}
}
/**
* Get block from remote block managers as serialized bytes.
*/
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
getRemoteManagedBuffer(blockId).map { data =>
getRemoteBlock(blockId, (data: ManagedBuffer) => {
// SPARK-24307 undocumented "escape-hatch" in case there are any issues in converting to
// ChunkedByteBuffer, to go back to old code-path. Can be removed post Spark 2.4 if
// new path is stable.
@ -959,7 +1034,7 @@ private[spark] class BlockManager(
} else {
ChunkedByteBuffer.fromManagedBuffer(data)
}
}
})
}
/**

View file

@ -56,13 +56,14 @@ class BlockManagerMaster(
* updated BlockManagerId fleshed out with this information.
*/
def registerBlockManager(
blockManagerId: BlockManagerId,
id: BlockManagerId,
localDirs: Array[String],
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
logInfo(s"Registering BlockManager $blockManagerId")
logInfo(s"Registering BlockManager $id")
val updatedId = driverEndpoint.askSync[BlockManagerId](
RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
logInfo(s"Registered BlockManager $updatedId")
updatedId
}
@ -85,9 +86,11 @@ class BlockManagerMaster(
}
/** Get locations as well as status of the blockId from the driver */
def getLocationsAndStatus(blockId: BlockId): Option[BlockLocationsAndStatus] = {
def getLocationsAndStatus(
blockId: BlockId,
requesterHost: String): Option[BlockLocationsAndStatus] = {
driverEndpoint.askSync[Option[BlockLocationsAndStatus]](
GetLocationsAndStatus(blockId))
GetLocationsAndStatus(blockId, requesterHost))
}
/** Get locations of multiple blockIds from the driver */

View file

@ -86,8 +86,8 @@ class BlockManagerMasterEndpoint(
private val externalShuffleServicePort: Int = StorageUtils.externalShuffleServicePort(conf)
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
@ -97,8 +97,8 @@ class BlockManagerMasterEndpoint(
case GetLocations(blockId) =>
context.reply(getLocations(blockId))
case GetLocationsAndStatus(blockId) =>
context.reply(getLocationsAndStatus(blockId))
case GetLocationsAndStatus(blockId, requesterHost) =>
context.reply(getLocationsAndStatus(blockId, requesterHost))
case GetLocationsMultipleBlockIds(blockIds) =>
context.reply(getLocationsMultipleBlockIds(blockIds))
@ -398,6 +398,7 @@ class BlockManagerMasterEndpoint(
*/
private def register(
idWithoutTopologyInfo: BlockManagerId,
localDirs: Array[String],
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
@ -433,8 +434,8 @@ class BlockManagerMasterEndpoint(
None
}
blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxOnHeapMemSize,
maxOffHeapMemSize, slaveEndpoint, externalShuffleServiceBlockStatus)
blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), localDirs,
maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint, externalShuffleServiceBlockStatus)
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
@ -499,18 +500,30 @@ class BlockManagerMasterEndpoint(
if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
}
private def getLocationsAndStatus(blockId: BlockId): Option[BlockLocationsAndStatus] = {
private def getLocationsAndStatus(
blockId: BlockId,
requesterHost: String): Option[BlockLocationsAndStatus] = {
val locations = Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty)
val status = locations.headOption.flatMap { bmId =>
if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) {
Option(blockStatusByShuffleService(bmId).get(blockId))
} else {
blockManagerInfo(bmId).getStatus(blockId)
blockManagerInfo.get(bmId).flatMap(_.getStatus(blockId))
}
}
if (locations.nonEmpty && status.isDefined) {
Some(BlockLocationsAndStatus(locations, status.get))
val localDirs = locations.find { loc =>
if (loc.port != externalShuffleServicePort && loc.host == requesterHost) {
blockManagerInfo
.get(loc)
.flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk))
.getOrElse(false)
} else {
false
}
}.map(blockManagerInfo(_).localDirs)
Some(BlockLocationsAndStatus(locations, status.get, localDirs))
} else {
None
}
@ -561,6 +574,7 @@ object BlockStatus {
private[spark] class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
val localDirs: Array[String],
val maxOnHeapMem: Long,
val maxOffHeapMem: Long,
val slaveEndpoint: RpcEndpointRef,

View file

@ -58,6 +58,7 @@ private[spark] object BlockManagerMessages {
case class RegisterBlockManager(
blockManagerId: BlockManagerId,
localDirs: Array[String],
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
sender: RpcEndpointRef)
@ -93,10 +94,20 @@ private[spark] object BlockManagerMessages {
case class GetLocations(blockId: BlockId) extends ToBlockManagerMaster
case class GetLocationsAndStatus(blockId: BlockId) extends ToBlockManagerMaster
case class GetLocationsAndStatus(blockId: BlockId, requesterHost: String)
extends ToBlockManagerMaster
// The response message of `GetLocationsAndStatus` request.
case class BlockLocationsAndStatus(locations: Seq[BlockManagerId], status: BlockStatus) {
/**
* The response message of `GetLocationsAndStatus` request.
*
* @param localDirs if it is persisted-to-disk on the same host as the requester executor is
* running on then localDirs will be Some and the cached data will be in a file
* in one of those dirs, otherwise it is None.
*/
case class BlockLocationsAndStatus(
locations: Seq[BlockManagerId],
status: BlockStatus,
localDirs: Option[Array[String]]) {
assert(locations.nonEmpty)
}

View file

@ -44,6 +44,9 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}
private[spark] val localDirsString: Array[String] = localDirs.map(_.toString)
// The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content
// of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
@ -52,7 +55,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
// org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile().
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)

View file

@ -31,6 +31,7 @@ class BlockManagerInfoSuite extends SparkFunSuite {
val bmInfo = new BlockManagerInfo(
BlockManagerId("executor0", "host", 1234, None),
timeMs = 300,
Array(),
maxOnHeapMem = 10000,
maxOffHeapMem = 20000,
slaveEndpoint = null,

View file

@ -29,7 +29,8 @@ import scala.reflect.ClassTag
import org.apache.commons.lang3.RandomUtils
import org.mockito.{ArgumentMatchers => mc}
import org.mockito.Mockito.{mock, times, verify, when}
import org.mockito.Mockito.{doAnswer, mock, spy, times, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.scalatest._
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.concurrent.Eventually._
@ -46,7 +47,7 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer,
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransportConf}
import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap}
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ExternalShuffleClient}
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ExecutorDiskUtils, ExternalShuffleClient}
import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
@ -567,6 +568,114 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.getRemoteBytes("list1").isEmpty)
}
Seq(
StorageLevel(useDisk = true, useMemory = false, deserialized = false),
StorageLevel(useDisk = true, useMemory = false, deserialized = true),
StorageLevel(useDisk = true, useMemory = false, deserialized = true, replication = 2)
).foreach { storageLevel =>
test(s"SPARK-27622: avoid the network when block requested from same host, $storageLevel") {
conf.set("spark.shuffle.io.maxRetries", "0")
val sameHostBm = makeBlockManager(8000, "sameHost", master)
val otherHostTransferSrv = spy(sameHostBm.blockTransferService)
doAnswer { _ =>
"otherHost"
}.when(otherHostTransferSrv).hostName
val otherHostBm = makeBlockManager(8000, "otherHost", master, Some(otherHostTransferSrv))
// This test always uses the cleanBm to get the block. In case of replication
// the block can be added to the otherHostBm as direct disk read will use
// the local disk of sameHostBm where the block is replicated to.
// When there is no replication then block must be added via sameHostBm directly.
val bmToPutBlock = if (storageLevel.replication > 1) otherHostBm else sameHostBm
val array = Array.fill(16)(Byte.MinValue to Byte.MaxValue).flatten
val blockId = "list"
bmToPutBlock.putIterator(blockId, List(array).iterator, storageLevel, tellMaster = true)
val sameHostTransferSrv = spy(sameHostBm.blockTransferService)
doAnswer { _ =>
fail("Fetching over network is not expected when the block is requested from same host")
}.when(sameHostTransferSrv).fetchBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any())
val cleanBm = makeBlockManager(8000, "clean", master, Some(sameHostTransferSrv))
// check getRemoteBytes
val bytesViaStore1 = cleanBm.getRemoteBytes(blockId)
assert(bytesViaStore1.isDefined)
val expectedContent = sameHostBm.getBlockData(blockId).nioByteBuffer().array()
assert(bytesViaStore1.get.toArray === expectedContent)
// check getRemoteValues
val valueViaStore1 = cleanBm.getRemoteValues[List.type](blockId)
assert(valueViaStore1.isDefined)
assert(valueViaStore1.get.data.toList.head === array)
}
}
private def testWithFileDelAfterLocalDiskRead(level: StorageLevel, getValueOrBytes: Boolean) = {
val testedFunc = if (getValueOrBytes) "getRemoteValue()" else "getRemoteBytes()"
val testNameSuffix = s"$level, $testedFunc"
test(s"SPARK-27622: as file is removed fall back to network fetch, $testNameSuffix") {
conf.set("spark.shuffle.io.maxRetries", "0")
// variable to check the usage of the local disk of the remote executor on the same host
var sameHostExecutorTried: Boolean = false
val store2 = makeBlockManager(8000, "executor2", this.master,
Some(new MockBlockTransferService(0)))
val blockId = "list"
val array = Array.fill(16)(Byte.MinValue to Byte.MaxValue).flatten
store2.putIterator(blockId, List(array).iterator, level, true)
val expectedBlockData = store2.getLocalBytes(blockId)
assert(expectedBlockData.isDefined)
val expectedByteBuffer = expectedBlockData.get.toByteBuffer()
val mockTransferService = new MockBlockTransferService(0) {
override def fetchBlockSync(
host: String,
port: Int,
execId: String,
blockId: String,
tempFileManager: DownloadFileManager): ManagedBuffer = {
assert(sameHostExecutorTried, "before using the network local disk of the remote " +
"executor (running on the same host) is expected to be tried")
new NioManagedBuffer(expectedByteBuffer)
}
}
val store1 = makeBlockManager(8000, "executor1", this.master, Some(mockTransferService))
val spiedStore1 = spy(store1)
doAnswer { inv =>
val blockId = inv.getArguments()(0).asInstanceOf[BlockId]
val localDirs = inv.getArguments()(1).asInstanceOf[Array[String]]
val blockSize = inv.getArguments()(2).asInstanceOf[Long]
val res = store1.readDiskBlockFromSameHostExecutor(blockId, localDirs, blockSize)
assert(res.isDefined)
val file = ExecutorDiskUtils.getFile(localDirs, store1.subDirsPerLocalDir, blockId.name)
// delete the file behind the blockId
assert(file.delete())
sameHostExecutorTried = true
res
}.when(spiedStore1).readDiskBlockFromSameHostExecutor(mc.any(), mc.any(), mc.any())
if (getValueOrBytes) {
val valuesViaStore1 = spiedStore1.getRemoteValues(blockId)
assert(sameHostExecutorTried)
assert(valuesViaStore1.isDefined)
assert(valuesViaStore1.get.data.toList.head === array)
} else {
val bytesViaStore1 = spiedStore1.getRemoteBytes(blockId)
assert(sameHostExecutorTried)
assert(bytesViaStore1.isDefined)
assert(bytesViaStore1.get.toByteBuffer === expectedByteBuffer)
}
}
}
Seq(
StorageLevel(useDisk = true, useMemory = false, deserialized = false),
StorageLevel(useDisk = true, useMemory = false, deserialized = true)
).foreach { storageLevel =>
Seq(true, false).foreach { valueOrBytes =>
testWithFileDelAfterLocalDiskRead(storageLevel, valueOrBytes)
}
}
test("SPARK-14252: getOrElseUpdate should still read from remote storage") {
val store = makeBlockManager(8000, "executor1")
val store2 = makeBlockManager(8000, "executor2")
@ -1315,8 +1424,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// so that we have a chance to do location refresh
val blockManagerIds = (0 to maxFailuresBeforeLocationRefresh)
.map { i => BlockManagerId(s"id-$i", s"host-$i", i + 1) }
when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId])).thenReturn(
Option(BlockLocationsAndStatus(blockManagerIds, BlockStatus.empty)))
when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId], mc.any[String])).thenReturn(
Option(BlockLocationsAndStatus(blockManagerIds, BlockStatus.empty, None)))
when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn(
blockManagerIds)
@ -1325,7 +1434,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val block = store.getRemoteBytes("item")
.asInstanceOf[Option[ByteBuffer]]
assert(block.isDefined)
verify(mockBlockManagerMaster, times(1)).getLocationsAndStatus("item")
verify(mockBlockManagerMaster, times(1))
.getLocationsAndStatus("item", "MockBlockTransferServiceHost")
verify(mockBlockManagerMaster, times(1)).getLocations("item")
}
@ -1502,8 +1612,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val blockLocations = Seq(BlockManagerId("id-0", "host-0", 1))
val blockStatus = BlockStatus(StorageLevel.DISK_ONLY, 0L, 2000L)
when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId])).thenReturn(
Option(BlockLocationsAndStatus(blockLocations, blockStatus)))
when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId], mc.any[String])).thenReturn(
Option(BlockLocationsAndStatus(blockLocations, blockStatus, None)))
when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn(blockLocations)
val store = makeBlockManager(8000, "executor1", mockBlockManagerMaster,