diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala new file mode 100644 index 0000000000..f6e46ae9a4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.apache.spark.SparkException + +private[spark] +case class BlockFetchException(messages: String, throwable: Throwable) + extends SparkException(messages, throwable) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index fefaef0ab8..d31aa68eb6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -23,6 +23,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer} import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.{ExecutionContext, Await, Future} import scala.concurrent.duration._ +import scala.util.control.NonFatal import scala.util.Random import sun.nio.ch.DirectBuffer @@ -600,10 +601,26 @@ private[spark] class BlockManager( private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") val locations = Random.shuffle(master.getLocations(blockId)) + var numFetchFailures = 0 for (loc <- locations) { logDebug(s"Getting remote block $blockId from $loc") - val data = blockTransferService.fetchBlockSync( - loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() + val data = try { + blockTransferService.fetchBlockSync( + loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() + } catch { + case NonFatal(e) => + numFetchFailures += 1 + if (numFetchFailures == locations.size) { + // An exception is thrown while fetching this block from all locations + throw new BlockFetchException(s"Failed to fetch block from" + + s" ${locations.size} locations. Most recent failure cause:", e) + } else { + // This location failed, so we retry fetch from a different one by returning null here + logWarning(s"Failed to fetch remote block $blockId " + + s"from $loc (failed attempt $numFetchFailures)", e) + null + } + } if (data != null) { if (asBlockResult) { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index f480fd107a..e5b54d66c8 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -47,6 +47,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE private val conf = new SparkConf(false) var store: BlockManager = null var store2: BlockManager = null + var store3: BlockManager = null var rpcEnv: RpcEnv = null var master: BlockManagerMaster = null conf.set("spark.authenticate", "false") @@ -99,6 +100,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store2.stop() store2 = null } + if (store3 != null) { + store3.stop() + store3 = null + } rpcEnv.shutdown() rpcEnv.awaitTermination() rpcEnv = null @@ -443,6 +448,38 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(list2DiskGet.get.readMethod === DataReadMethod.Disk) } + test("SPARK-9591: getRemoteBytes from another location when Exception throw") { + val origTimeoutOpt = conf.getOption("spark.network.timeout") + try { + conf.set("spark.network.timeout", "2s") + store = makeBlockManager(8000, "executor1") + store2 = makeBlockManager(8000, "executor2") + store3 = makeBlockManager(8000, "executor3") + val list1 = List(new Array[Byte](4000)) + store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + var list1Get = store.getRemoteBytes("list1") + assert(list1Get.isDefined, "list1Get expected to be fetched") + // block manager exit + store2.stop() + store2 = null + list1Get = store.getRemoteBytes("list1") + // get `list1` block + assert(list1Get.isDefined, "list1Get expected to be fetched") + store3.stop() + store3 = null + // exception throw because there is no locations + intercept[BlockFetchException] { + list1Get = store.getRemoteBytes("list1") + } + } finally { + origTimeoutOpt match { + case Some(t) => conf.set("spark.network.timeout", t) + case None => conf.remove("spark.network.timeout") + } + } + } + test("in-memory LRU storage") { store = makeBlockManager(12000) val a1 = new Array[Byte](4000)