From b71c130fc6a5b0d25d60337c21cfd9c0203ab036 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Tue, 25 Jun 2019 07:35:44 -0700 Subject: [PATCH] [SPARK-27622][CORE] Avoiding the network when block manager fetches disk persisted RDD blocks from the same host MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changes were proposed in this pull request? Before this PR during fetching a disk persisted RDD block the network was always used to get the requested block content even when both the source and fetcher executor was running on the same host. The idea to access another executor local disk files by directly reading the disk comes from the external shuffle service where the local dirs are stored for each executor (block manager). To make this possible the following changes are done: - `RegisterBlockManager` message is extended with the `localDirs` which is stored by the block manager master for each block manager as a new property of the `BlockManagerInfo` - `GetLocationsAndStatus` is extended with the requester host - `BlockLocationsAndStatus` (the reply for `GetLocationsAndStatus` message) is extended with the an option of local directories, which is filled with a local directories of a same host executor (if there is any, otherwise None is used). This is where the block content can be read from. Shuffle blocks are out of scope of this PR: there will be a separate PR opened for that (for another Jira issue). ## How was this patch tested? With a new unit test in `BlockManagerSuite`. See the the test prefixed by "SPARK-27622: avoid the network when block requested from same host". Closes #24554 from attilapiros/SPARK-27622. Authored-by: “attilapiros” Signed-off-by: Marcelo Vanzin --- .../network/shuffle/ExecutorDiskUtils.java | 66 ++++++++++ .../shuffle/ExternalShuffleBlockResolver.java | 45 +------ .../ExternalShuffleBlockResolverSuite.java | 2 +- .../shuffle/TestShuffleDataContext.java | 9 +- .../apache/spark/storage/BlockManager.scala | 121 +++++++++++++---- .../spark/storage/BlockManagerMaster.scala | 13 +- .../storage/BlockManagerMasterEndpoint.scala | 32 +++-- .../spark/storage/BlockManagerMessages.scala | 17 ++- .../spark/storage/DiskBlockManager.scala | 5 +- .../spark/storage/BlockManagerInfoSuite.scala | 1 + .../spark/storage/BlockManagerSuite.scala | 124 +++++++++++++++++- 11 files changed, 342 insertions(+), 93 deletions(-) create mode 100644 common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskUtils.java diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskUtils.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskUtils.java new file mode 100644 index 0000000000..13f6046dd8 --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskUtils.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.io.File; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.spark.network.util.JavaUtils; + +public class ExecutorDiskUtils { + + private static final Pattern MULTIPLE_SEPARATORS = Pattern.compile(File.separator + "{2,}"); + + /** + * Hashes a filename into the corresponding local directory, in a manner consistent with + * Spark's DiskBlockManager.getFile(). + */ + public static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) { + int hash = JavaUtils.nonNegativeHash(filename); + String localDir = localDirs[hash % localDirs.length]; + int subDirId = (hash / localDirs.length) % subDirsPerLocalDir; + return new File(createNormalizedInternedPathname( + localDir, String.format("%02x", subDirId), filename)); + } + + /** + * This method is needed to avoid the situation when multiple File instances for the + * same pathname "foo/bar" are created, each with a separate copy of the "foo/bar" String. + * According to measurements, in some scenarios such duplicate strings may waste a lot + * of memory (~ 10% of the heap). To avoid that, we intern the pathname, and before that + * we make sure that it's in a normalized form (contains no "//", "///" etc.) Otherwise, + * the internal code in java.io.File would normalize it later, creating a new "foo/bar" + * String copy. Unfortunately, we cannot just reuse the normalization code that java.io.File + * uses, since it is in the package-private class java.io.FileSystem. + */ + @VisibleForTesting + static String createNormalizedInternedPathname(String dir1, String dir2, String fname) { + String pathname = dir1 + File.separator + dir2 + File.separator + fname; + Matcher m = MULTIPLE_SEPARATORS.matcher(pathname); + pathname = m.replaceAll("/"); + // A single trailing slash needs to be taken care of separately + if (pathname.length() > 1 && pathname.endsWith("/")) { + pathname = pathname.substring(0, pathname.length() - 1); + } + return pathname.intern(); + } + +} diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 87e6fe12b5..50f16fc700 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.regex.Matcher; import java.util.regex.Pattern; import com.fasterxml.jackson.annotation.JsonCreator; @@ -298,7 +297,7 @@ public class ExternalShuffleBlockResolver { */ private ManagedBuffer getSortBasedShuffleBlockData( ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) { - File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, + File indexFile = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, "shuffle_" + shuffleId + "_" + mapId + "_0.index"); try { @@ -306,7 +305,7 @@ public class ExternalShuffleBlockResolver { ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId); return new FileSegmentManagedBuffer( conf, - getFile(executor.localDirs, executor.subDirsPerLocalDir, + ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, "shuffle_" + shuffleId + "_" + mapId + "_0.data"), shuffleIndexRecord.getOffset(), shuffleIndexRecord.getLength()); @@ -317,7 +316,7 @@ public class ExternalShuffleBlockResolver { public ManagedBuffer getDiskPersistedRddBlockData( ExecutorShuffleInfo executor, int rddId, int splitIndex) { - File file = getFile(executor.localDirs, executor.subDirsPerLocalDir, + File file = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, "rdd_" + rddId + "_" + splitIndex); long fileLength = file.length(); ManagedBuffer res = null; @@ -327,19 +326,6 @@ public class ExternalShuffleBlockResolver { return res; } - /** - * Hashes a filename into the corresponding local directory, in a manner consistent with - * Spark's DiskBlockManager.getFile(). - */ - @VisibleForTesting - static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) { - int hash = JavaUtils.nonNegativeHash(filename); - String localDir = localDirs[hash % localDirs.length]; - int subDirId = (hash / localDirs.length) % subDirsPerLocalDir; - return new File(createNormalizedInternedPathname( - localDir, String.format("%02x", subDirId), filename)); - } - void close() { if (db != null) { try { @@ -350,28 +336,6 @@ public class ExternalShuffleBlockResolver { } } - /** - * This method is needed to avoid the situation when multiple File instances for the - * same pathname "foo/bar" are created, each with a separate copy of the "foo/bar" String. - * According to measurements, in some scenarios such duplicate strings may waste a lot - * of memory (~ 10% of the heap). To avoid that, we intern the pathname, and before that - * we make sure that it's in a normalized form (contains no "//", "///" etc.) Otherwise, - * the internal code in java.io.File would normalize it later, creating a new "foo/bar" - * String copy. Unfortunately, we cannot just reuse the normalization code that java.io.File - * uses, since it is in the package-private class java.io.FileSystem. - */ - @VisibleForTesting - static String createNormalizedInternedPathname(String dir1, String dir2, String fname) { - String pathname = dir1 + File.separator + dir2 + File.separator + fname; - Matcher m = MULTIPLE_SEPARATORS.matcher(pathname); - pathname = m.replaceAll("/"); - // A single trailing slash needs to be taken care of separately - if (pathname.length() > 1 && pathname.endsWith("/")) { - pathname = pathname.substring(0, pathname.length() - 1); - } - return pathname.intern(); - } - public int removeBlocks(String appId, String execId, String[] blockIds) { ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId)); if (executor == null) { @@ -380,7 +344,8 @@ public class ExternalShuffleBlockResolver { } int numRemovedBlocks = 0; for (String blockId : blockIds) { - File file = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId); + File file = + ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId); if (file.delete()) { numRemovedBlocks++; } else { diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index 459629c5f0..09eb699be3 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -149,7 +149,7 @@ public class ExternalShuffleBlockResolverSuite { private void assertPathsMatch(String p1, String p2, String p3, String expectedPathname) { String normPathname = - ExternalShuffleBlockResolver.createNormalizedInternedPathname(p1, p2, p3); + ExecutorDiskUtils.createNormalizedInternedPathname(p1, p2, p3); assertEquals(expectedPathname, normPathname); File file = new File(normPathname); String returnedPath = file.getPath(); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java index 10be95ec50..457805feea 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java @@ -76,9 +76,9 @@ public class TestShuffleDataContext { try { dataStream = new FileOutputStream( - ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".data")); + ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".data")); indexStream = new DataOutputStream(new FileOutputStream( - ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".index"))); + ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".index"))); long offset = 0; indexStream.writeLong(offset); @@ -121,10 +121,11 @@ public class TestShuffleDataContext { private void insertFile(String filename, byte[] block) throws IOException { OutputStream dataStream = null; - File file = ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, filename); + File file = ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename); assert(!file.exists()) : "this test file has been already generated"; try { - dataStream = new FileOutputStream(file); + dataStream = new FileOutputStream( + ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename)); dataStream.write(block); } finally { Closeables.close(dataStream, false); diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f7e2493eb0..c3ec1594fa 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -43,8 +43,9 @@ import org.apache.spark.internal.config.Network import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.metrics.source.Source import org.apache.spark.network._ -import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.client.StreamCallbackWithID +import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle._ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo import org.apache.spark.network.util.TransportConf @@ -138,6 +139,8 @@ private[spark] class BlockManager( private val remoteReadNioBufferConversion = conf.get(Network.NETWORK_REMOTE_READ_NIO_BUFFER_CONVERSION) + private[spark] val subDirsPerLocalDir = conf.get(config.DISKSTORE_SUB_DIRECTORIES) + val diskBlockManager = { // Only perform cleanup if an external service is not serving our shuffle files. val deleteFilesOnStop = @@ -411,6 +414,7 @@ private[spark] class BlockManager( val idFromMaster = master.registerBlockManager( id, + diskBlockManager.localDirsString, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint) @@ -445,7 +449,7 @@ private[spark] class BlockManager( private def registerWithExternalShuffleServer() { logInfo("Registering executor with local external shuffle service.") val shuffleConfig = new ExecutorShuffleInfo( - diskBlockManager.localDirs.map(_.toString), + diskBlockManager.localDirsString, diskBlockManager.subDirsPerLocalDir, shuffleManager.getClass.getName) @@ -500,7 +504,8 @@ private[spark] class BlockManager( def reregister(): Unit = { // TODO: We might need to rate limit re-registering. logInfo(s"BlockManager $blockManagerId re-registering with master") - master.registerBlockManager(blockManagerId, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint) + master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory, + maxOffHeapMemory, slaveEndpoint) reportAllBlocks() } @@ -827,10 +832,63 @@ private[spark] class BlockManager( */ private[spark] def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = { val ct = implicitly[ClassTag[T]] - getRemoteManagedBuffer(blockId).map { data => + getRemoteBlock(blockId, (data: ManagedBuffer) => { val values = serializerManager.dataDeserializeStream(blockId, data.createInputStream())(ct) new BlockResult(values, DataReadMethod.Network, data.size) + }) + } + + /** + * Get the remote block and transform it to the provided data type. + * + * If the block is persisted to the disk and stored at an executor running on the same host then + * first it is tried to be accessed using the local directories of the other executor directly. + * If the file is successfully identified then tried to be transformed by the provided + * transformation function which expected to open the file. If there is any exception during this + * transformation then block access falls back to fetching it from the remote executor via the + * network. + * + * @param blockId identifies the block to get + * @param bufferTransformer this transformer expected to open the file if the block is backed by a + * file by this it is guaranteed the whole content can be loaded + * @tparam T result type + * @return + */ + private[spark] def getRemoteBlock[T]( + blockId: BlockId, + bufferTransformer: ManagedBuffer => T): Option[T] = { + logDebug(s"Getting remote block $blockId") + require(blockId != null, "BlockId is null") + + // Because all the remote blocks are registered in driver, it is not necessary to ask + // all the slave executors to get block status. + val locationsAndStatusOption = master.getLocationsAndStatus(blockId, blockManagerId.host) + if (locationsAndStatusOption.isEmpty) { + logDebug(s"Block $blockId is unknown by block manager master") + None + } else { + val locationsAndStatus = locationsAndStatusOption.get + val blockSize = locationsAndStatus.status.diskSize.max(locationsAndStatus.status.memSize) + + locationsAndStatus.localDirs.flatMap { localDirs => + val blockDataOption = + readDiskBlockFromSameHostExecutor(blockId, localDirs, locationsAndStatus.status.diskSize) + val res = blockDataOption.flatMap { blockData => + try { + Some(bufferTransformer(blockData)) + } catch { + case NonFatal(e) => + logDebug("Block from the same host executor cannot be opened: ", e) + None + } + } + logInfo(s"Read $blockId from the disk of a same host executor is " + + (if (res.isDefined) "successful." else "failed.")) + res + }.orElse { + fetchRemoteManagedBuffer(blockId, blockSize, locationsAndStatus).map(bufferTransformer) + } } } @@ -861,22 +919,12 @@ private[spark] class BlockManager( } /** - * Get block from remote block managers as a ManagedBuffer. + * Fetch the block from remote block managers as a ManagedBuffer. */ - private def getRemoteManagedBuffer(blockId: BlockId): Option[ManagedBuffer] = { - logDebug(s"Getting remote block $blockId") - require(blockId != null, "BlockId is null") - var runningFailureCount = 0 - var totalFailureCount = 0 - - // Because all the remote blocks are registered in driver, it is not necessary to ask - // all the slave executors to get block status. - val locationsAndStatus = master.getLocationsAndStatus(blockId) - val blockSize = locationsAndStatus.map { b => - b.status.diskSize.max(b.status.memSize) - }.getOrElse(0L) - val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty) - + private def fetchRemoteManagedBuffer( + blockId: BlockId, + blockSize: Long, + locationsAndStatus: BlockManagerMessages.BlockLocationsAndStatus): Option[ManagedBuffer] = { // If the block size is above the threshold, we should pass our FileManger to // BlockTransferService, which will leverage it to spill the block; if not, then passed-in // null value means the block will be persisted in memory. @@ -885,8 +933,9 @@ private[spark] class BlockManager( } else { null } - - val locations = sortLocations(blockLocations) + var runningFailureCount = 0 + var totalFailureCount = 0 + val locations = sortLocations(locationsAndStatus.locations) val maxFetchFailures = locations.size var locationIterator = locations.iterator while (locationIterator.hasNext) { @@ -946,11 +995,37 @@ private[spark] class BlockManager( None } + /** + * Reads the block from the local directories of another executor which runs on the same host. + */ + private[spark] def readDiskBlockFromSameHostExecutor( + blockId: BlockId, + localDirs: Array[String], + blockSize: Long): Option[ManagedBuffer] = { + val file = ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId.name) + if (file.exists()) { + val mangedBuffer = securityManager.getIOEncryptionKey() match { + case Some(key) => + // Encrypted blocks cannot be memory mapped; return a special object that does decryption + // and provides InputStream / FileRegion implementations for reading the data. + new EncryptedManagedBuffer( + new EncryptedBlockData(file, blockSize, conf, key)) + + case _ => + val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle") + new FileSegmentManagedBuffer(transportConf, file, 0, file.length) + } + Some(mangedBuffer) + } else { + None + } + } + /** * Get block from remote block managers as serialized bytes. */ def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { - getRemoteManagedBuffer(blockId).map { data => + getRemoteBlock(blockId, (data: ManagedBuffer) => { // SPARK-24307 undocumented "escape-hatch" in case there are any issues in converting to // ChunkedByteBuffer, to go back to old code-path. Can be removed post Spark 2.4 if // new path is stable. @@ -959,7 +1034,7 @@ private[spark] class BlockManager( } else { ChunkedByteBuffer.fromManagedBuffer(data) } - } + }) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 939a134e86..9d13fedfb0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -56,13 +56,14 @@ class BlockManagerMaster( * updated BlockManagerId fleshed out with this information. */ def registerBlockManager( - blockManagerId: BlockManagerId, + id: BlockManagerId, + localDirs: Array[String], maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { - logInfo(s"Registering BlockManager $blockManagerId") + logInfo(s"Registering BlockManager $id") val updatedId = driverEndpoint.askSync[BlockManagerId]( - RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) + RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) logInfo(s"Registered BlockManager $updatedId") updatedId } @@ -85,9 +86,11 @@ class BlockManagerMaster( } /** Get locations as well as status of the blockId from the driver */ - def getLocationsAndStatus(blockId: BlockId): Option[BlockLocationsAndStatus] = { + def getLocationsAndStatus( + blockId: BlockId, + requesterHost: String): Option[BlockLocationsAndStatus] = { driverEndpoint.askSync[Option[BlockLocationsAndStatus]]( - GetLocationsAndStatus(blockId)) + GetLocationsAndStatus(blockId, requesterHost)) } /** Get locations of multiple blockIds from the driver */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 2057d1b6c0..040fed299f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -86,8 +86,8 @@ class BlockManagerMasterEndpoint( private val externalShuffleServicePort: Int = StorageUtils.externalShuffleServicePort(conf) override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) => - context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) + case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) => + context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) case _updateBlockInfo @ UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => @@ -97,8 +97,8 @@ class BlockManagerMasterEndpoint( case GetLocations(blockId) => context.reply(getLocations(blockId)) - case GetLocationsAndStatus(blockId) => - context.reply(getLocationsAndStatus(blockId)) + case GetLocationsAndStatus(blockId, requesterHost) => + context.reply(getLocationsAndStatus(blockId, requesterHost)) case GetLocationsMultipleBlockIds(blockIds) => context.reply(getLocationsMultipleBlockIds(blockIds)) @@ -398,6 +398,7 @@ class BlockManagerMasterEndpoint( */ private def register( idWithoutTopologyInfo: BlockManagerId, + localDirs: Array[String], maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { @@ -433,8 +434,8 @@ class BlockManagerMasterEndpoint( None } - blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxOnHeapMemSize, - maxOffHeapMemSize, slaveEndpoint, externalShuffleServiceBlockStatus) + blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), localDirs, + maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint, externalShuffleServiceBlockStatus) } listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) @@ -499,18 +500,30 @@ class BlockManagerMasterEndpoint( if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty } - private def getLocationsAndStatus(blockId: BlockId): Option[BlockLocationsAndStatus] = { + private def getLocationsAndStatus( + blockId: BlockId, + requesterHost: String): Option[BlockLocationsAndStatus] = { val locations = Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty) val status = locations.headOption.flatMap { bmId => if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) { Option(blockStatusByShuffleService(bmId).get(blockId)) } else { - blockManagerInfo(bmId).getStatus(blockId) + blockManagerInfo.get(bmId).flatMap(_.getStatus(blockId)) } } if (locations.nonEmpty && status.isDefined) { - Some(BlockLocationsAndStatus(locations, status.get)) + val localDirs = locations.find { loc => + if (loc.port != externalShuffleServicePort && loc.host == requesterHost) { + blockManagerInfo + .get(loc) + .flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk)) + .getOrElse(false) + } else { + false + } + }.map(blockManagerInfo(_).localDirs) + Some(BlockLocationsAndStatus(locations, status.get, localDirs)) } else { None } @@ -561,6 +574,7 @@ object BlockStatus { private[spark] class BlockManagerInfo( val blockManagerId: BlockManagerId, timeMs: Long, + val localDirs: Array[String], val maxOnHeapMem: Long, val maxOffHeapMem: Long, val slaveEndpoint: RpcEndpointRef, diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 382501adb1..895f48d070 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -58,6 +58,7 @@ private[spark] object BlockManagerMessages { case class RegisterBlockManager( blockManagerId: BlockManagerId, + localDirs: Array[String], maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, sender: RpcEndpointRef) @@ -93,10 +94,20 @@ private[spark] object BlockManagerMessages { case class GetLocations(blockId: BlockId) extends ToBlockManagerMaster - case class GetLocationsAndStatus(blockId: BlockId) extends ToBlockManagerMaster + case class GetLocationsAndStatus(blockId: BlockId, requesterHost: String) + extends ToBlockManagerMaster - // The response message of `GetLocationsAndStatus` request. - case class BlockLocationsAndStatus(locations: Seq[BlockManagerId], status: BlockStatus) { + /** + * The response message of `GetLocationsAndStatus` request. + * + * @param localDirs if it is persisted-to-disk on the same host as the requester executor is + * running on then localDirs will be Some and the cached data will be in a file + * in one of those dirs, otherwise it is None. + */ + case class BlockLocationsAndStatus( + locations: Seq[BlockManagerId], + status: BlockStatus, + localDirs: Option[Array[String]]) { assert(locations.nonEmpty) } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 95ce4b0f09..c3990bf71e 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -44,6 +44,9 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea logError("Failed to create any local dir.") System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } + + private[spark] val localDirsString: Array[String] = localDirs.map(_.toString) + // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content // of subDirs(i) is protected by the lock of subDirs(i) private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) @@ -52,7 +55,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea /** Looks up a file by hashing it into one of our local subdirectories. */ // This method should be kept in sync with - // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile(). + // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile(). def getFile(filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala index 01e3d6a46e..49cbd66ccc 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala @@ -31,6 +31,7 @@ class BlockManagerInfoSuite extends SparkFunSuite { val bmInfo = new BlockManagerInfo( BlockManagerId("executor0", "host", 1234, None), timeMs = 300, + Array(), maxOnHeapMem = 10000, maxOffHeapMem = 20000, slaveEndpoint = null, diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index ab4693e86b..2d6e151f81 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -29,7 +29,8 @@ import scala.reflect.ClassTag import org.apache.commons.lang3.RandomUtils import org.mockito.{ArgumentMatchers => mc} -import org.mockito.Mockito.{mock, times, verify, when} +import org.mockito.Mockito.{doAnswer, mock, spy, times, verify, when} +import org.mockito.invocation.InvocationOnMock import org.scalatest._ import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.concurrent.Eventually._ @@ -46,7 +47,7 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransportConf} import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap} -import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ExternalShuffleClient} +import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ExecutorDiskUtils, ExternalShuffleClient} import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor} import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus @@ -567,6 +568,114 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.getRemoteBytes("list1").isEmpty) } + Seq( + StorageLevel(useDisk = true, useMemory = false, deserialized = false), + StorageLevel(useDisk = true, useMemory = false, deserialized = true), + StorageLevel(useDisk = true, useMemory = false, deserialized = true, replication = 2) + ).foreach { storageLevel => + test(s"SPARK-27622: avoid the network when block requested from same host, $storageLevel") { + conf.set("spark.shuffle.io.maxRetries", "0") + val sameHostBm = makeBlockManager(8000, "sameHost", master) + + val otherHostTransferSrv = spy(sameHostBm.blockTransferService) + doAnswer { _ => + "otherHost" + }.when(otherHostTransferSrv).hostName + val otherHostBm = makeBlockManager(8000, "otherHost", master, Some(otherHostTransferSrv)) + + // This test always uses the cleanBm to get the block. In case of replication + // the block can be added to the otherHostBm as direct disk read will use + // the local disk of sameHostBm where the block is replicated to. + // When there is no replication then block must be added via sameHostBm directly. + val bmToPutBlock = if (storageLevel.replication > 1) otherHostBm else sameHostBm + val array = Array.fill(16)(Byte.MinValue to Byte.MaxValue).flatten + val blockId = "list" + bmToPutBlock.putIterator(blockId, List(array).iterator, storageLevel, tellMaster = true) + + val sameHostTransferSrv = spy(sameHostBm.blockTransferService) + doAnswer { _ => + fail("Fetching over network is not expected when the block is requested from same host") + }.when(sameHostTransferSrv).fetchBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any()) + val cleanBm = makeBlockManager(8000, "clean", master, Some(sameHostTransferSrv)) + + // check getRemoteBytes + val bytesViaStore1 = cleanBm.getRemoteBytes(blockId) + assert(bytesViaStore1.isDefined) + val expectedContent = sameHostBm.getBlockData(blockId).nioByteBuffer().array() + assert(bytesViaStore1.get.toArray === expectedContent) + + // check getRemoteValues + val valueViaStore1 = cleanBm.getRemoteValues[List.type](blockId) + assert(valueViaStore1.isDefined) + assert(valueViaStore1.get.data.toList.head === array) + } + } + + private def testWithFileDelAfterLocalDiskRead(level: StorageLevel, getValueOrBytes: Boolean) = { + val testedFunc = if (getValueOrBytes) "getRemoteValue()" else "getRemoteBytes()" + val testNameSuffix = s"$level, $testedFunc" + test(s"SPARK-27622: as file is removed fall back to network fetch, $testNameSuffix") { + conf.set("spark.shuffle.io.maxRetries", "0") + // variable to check the usage of the local disk of the remote executor on the same host + var sameHostExecutorTried: Boolean = false + val store2 = makeBlockManager(8000, "executor2", this.master, + Some(new MockBlockTransferService(0))) + val blockId = "list" + val array = Array.fill(16)(Byte.MinValue to Byte.MaxValue).flatten + store2.putIterator(blockId, List(array).iterator, level, true) + val expectedBlockData = store2.getLocalBytes(blockId) + assert(expectedBlockData.isDefined) + val expectedByteBuffer = expectedBlockData.get.toByteBuffer() + val mockTransferService = new MockBlockTransferService(0) { + override def fetchBlockSync( + host: String, + port: Int, + execId: String, + blockId: String, + tempFileManager: DownloadFileManager): ManagedBuffer = { + assert(sameHostExecutorTried, "before using the network local disk of the remote " + + "executor (running on the same host) is expected to be tried") + new NioManagedBuffer(expectedByteBuffer) + } + } + val store1 = makeBlockManager(8000, "executor1", this.master, Some(mockTransferService)) + val spiedStore1 = spy(store1) + doAnswer { inv => + val blockId = inv.getArguments()(0).asInstanceOf[BlockId] + val localDirs = inv.getArguments()(1).asInstanceOf[Array[String]] + val blockSize = inv.getArguments()(2).asInstanceOf[Long] + val res = store1.readDiskBlockFromSameHostExecutor(blockId, localDirs, blockSize) + assert(res.isDefined) + val file = ExecutorDiskUtils.getFile(localDirs, store1.subDirsPerLocalDir, blockId.name) + // delete the file behind the blockId + assert(file.delete()) + sameHostExecutorTried = true + res + }.when(spiedStore1).readDiskBlockFromSameHostExecutor(mc.any(), mc.any(), mc.any()) + + if (getValueOrBytes) { + val valuesViaStore1 = spiedStore1.getRemoteValues(blockId) + assert(sameHostExecutorTried) + assert(valuesViaStore1.isDefined) + assert(valuesViaStore1.get.data.toList.head === array) + } else { + val bytesViaStore1 = spiedStore1.getRemoteBytes(blockId) + assert(sameHostExecutorTried) + assert(bytesViaStore1.isDefined) + assert(bytesViaStore1.get.toByteBuffer === expectedByteBuffer) + } + } + } + + Seq( + StorageLevel(useDisk = true, useMemory = false, deserialized = false), + StorageLevel(useDisk = true, useMemory = false, deserialized = true) + ).foreach { storageLevel => + Seq(true, false).foreach { valueOrBytes => + testWithFileDelAfterLocalDiskRead(storageLevel, valueOrBytes) + } + } + test("SPARK-14252: getOrElseUpdate should still read from remote storage") { val store = makeBlockManager(8000, "executor1") val store2 = makeBlockManager(8000, "executor2") @@ -1315,8 +1424,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // so that we have a chance to do location refresh val blockManagerIds = (0 to maxFailuresBeforeLocationRefresh) .map { i => BlockManagerId(s"id-$i", s"host-$i", i + 1) } - when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId])).thenReturn( - Option(BlockLocationsAndStatus(blockManagerIds, BlockStatus.empty))) + when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId], mc.any[String])).thenReturn( + Option(BlockLocationsAndStatus(blockManagerIds, BlockStatus.empty, None))) when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn( blockManagerIds) @@ -1325,7 +1434,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val block = store.getRemoteBytes("item") .asInstanceOf[Option[ByteBuffer]] assert(block.isDefined) - verify(mockBlockManagerMaster, times(1)).getLocationsAndStatus("item") + verify(mockBlockManagerMaster, times(1)) + .getLocationsAndStatus("item", "MockBlockTransferServiceHost") verify(mockBlockManagerMaster, times(1)).getLocations("item") } @@ -1502,8 +1612,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val blockLocations = Seq(BlockManagerId("id-0", "host-0", 1)) val blockStatus = BlockStatus(StorageLevel.DISK_ONLY, 0L, 2000L) - when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId])).thenReturn( - Option(BlockLocationsAndStatus(blockLocations, blockStatus))) + when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId], mc.any[String])).thenReturn( + Option(BlockLocationsAndStatus(blockLocations, blockStatus, None))) when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn(blockLocations) val store = makeBlockManager(8000, "executor1", mockBlockManagerMaster,