[SPARK-36098][CORE] Grouping exception in core/storage

### What changes were proposed in this pull request?
This PR group exception messages in core/src/main/scala/org/apache/spark/storage

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #33530 from dgd-contributor/SPARK_36098_group_exception_core_storage.

Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
dgd-contributor 2021-08-06 16:17:53 +08:00 committed by Wenchen Fan
parent c97fb68885
commit 7bb53b85f0
10 changed files with 133 additions and 38 deletions

View file

@ -22,7 +22,8 @@ import java.io.IOException
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.storage.{BlockId, RDDBlockId}
import org.apache.spark.shuffle.{FetchFailedException, ShuffleManager}
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockNotFoundException, BlockSavedOnDecommissionedBlockManagerException, RDDBlockId, UnrecognizedBlockId}
/**
* Object for grouping error messages from (most) exceptions thrown during query execution.
@ -141,4 +142,98 @@ object SparkCoreErrors {
def mustSpecifyCheckpointDirError(): Throwable = {
new SparkException("Checkpoint dir must be specified.")
}
def unrecognizedBlockIdError(name: String): Throwable = {
new UnrecognizedBlockId(name)
}
def taskHasNotLockedBlockError(currentTaskAttemptId: Long, blockId: BlockId): Throwable = {
new SparkException(s"Task $currentTaskAttemptId has not locked block $blockId for writing")
}
def blockDoesNotExistError(blockId: BlockId): Throwable = {
new SparkException(s"Block $blockId does not exist")
}
def cannotSaveBlockOnDecommissionedExecutorError(blockId: BlockId): Throwable = {
new BlockSavedOnDecommissionedBlockManagerException(blockId)
}
def waitingForReplicationToFinishError(e: Throwable): Throwable = {
new SparkException("Error occurred while waiting for replication to finish", e)
}
def unableToRegisterWithExternalShuffleServerError(e: Throwable): Throwable = {
new SparkException(s"Unable to register with external shuffle server due to : ${e.getMessage}",
e)
}
def waitingForAsyncReregistrationError(e: Throwable): Throwable = {
new SparkException("Error occurred while waiting for async. reregistration", e)
}
def unexpectedShuffleBlockWithUnsupportedResolverError(
shuffleManager: ShuffleManager,
blockId: BlockId): Throwable = {
new SparkException(s"Unexpected shuffle block ${blockId} with unsupported shuffle " +
s"resolver ${shuffleManager.shuffleBlockResolver}")
}
def failToStoreBlockOnBlockManagerError(
blockManagerId: BlockManagerId,
blockId: BlockId): Throwable = {
new SparkException(s"Failure while trying to store block $blockId on $blockManagerId.")
}
def readLockedBlockNotFoundError(blockId: BlockId): Throwable = {
new SparkException(s"Block $blockId was not found even though it's read-locked")
}
def failToGetBlockWithLockError(blockId: BlockId): Throwable = {
new SparkException(s"get() failed for block $blockId even though we held a lock")
}
def blockNotFoundError(blockId: BlockId): Throwable = {
new BlockNotFoundException(blockId.toString)
}
def interruptedError(): Throwable = {
new InterruptedException()
}
def blockStatusQueryReturnedNullError(blockId: BlockId): Throwable = {
new SparkException(s"BlockManager returned null for BlockStatus query: $blockId")
}
def unexpectedBlockManagerMasterEndpointResultError(): Throwable = {
new SparkException("BlockManagerMasterEndpoint returned false, expected true.")
}
def failToCreateDirectoryError(path: String, maxAttempts: Int): Throwable = {
new IOException(
s"Failed to create directory ${path} with permission 770 after $maxAttempts attempts!")
}
def unsupportedOperationError(): Throwable = {
new UnsupportedOperationException()
}
def noSuchElementError(): Throwable = {
new NoSuchElementException()
}
def fetchFailedError(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Long,
mapIndex: Int,
reduceId: Int,
message: String,
cause: Throwable = null): Throwable = {
new FetchFailedException(bmAddress, shuffleId, mapId, mapIndex, reduceId, message, cause)
}
def failToGetNonShuffleBlockError(blockId: BlockId, e: Throwable): Throwable = {
new SparkException(s"Failed to get block $blockId, which is not a shuffle block", e)
}
}

View file

@ -21,6 +21,7 @@ import java.util.UUID
import org.apache.spark.SparkException
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.network.shuffle.RemoteBlockPushResolver
/**
@ -250,7 +251,6 @@ object BlockId {
TempShuffleBlockId(UUID.fromString(uuid))
case TEST(value) =>
TestBlockId(value)
case _ =>
throw new UnrecognizedBlockId(name)
case _ => throw SparkCoreErrors.unrecognizedBlockIdError(name)
}
}

View file

@ -25,7 +25,8 @@ import scala.reflect.ClassTag
import com.google.common.collect.{ConcurrentHashMultiset, ImmutableMultiset}
import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.TaskContext
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.Logging
@ -247,13 +248,12 @@ private[storage] class BlockInfoManager extends Logging {
infos.get(blockId) match {
case Some(info) =>
if (info.writerTask != currentTaskAttemptId) {
throw new SparkException(
s"Task $currentTaskAttemptId has not locked block $blockId for writing")
throw SparkCoreErrors.taskHasNotLockedBlockError(currentTaskAttemptId, blockId)
} else {
info
}
case None =>
throw new SparkException(s"Block $blockId does not exist")
throw SparkCoreErrors.blockDoesNotExistError(blockId)
}
}

View file

@ -38,6 +38,7 @@ import com.github.benmanes.caffeine.cache.Caffeine
import org.apache.commons.io.IOUtils
import org.apache.spark._
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
@ -271,7 +272,7 @@ private[spark] class BlockManager(
// Don't reject broadcast blocks since they may be stored during task exec and
// don't need to be migrated.
if (isDecommissioning() && !blockId.isBroadcast) {
throw new BlockSavedOnDecommissionedBlockManagerException(blockId)
throw SparkCoreErrors.cannotSaveBlockOnDecommissionedExecutorError(blockId)
}
}
@ -414,8 +415,7 @@ private[spark] class BlockManager(
try {
ThreadUtils.awaitReady(replicationFuture, Duration.Inf)
} catch {
case NonFatal(t) =>
throw new SparkException("Error occurred while waiting for replication to finish", t)
case NonFatal(t) => throw SparkCoreErrors.waitingForReplicationToFinishError(t)
}
}
if (blockWasSuccessfullyStored) {
@ -586,9 +586,7 @@ private[spark] class BlockManager(
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}"
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
Thread.sleep(SLEEP_TIME_SECS * 1000L)
case NonFatal(e) =>
throw new SparkException("Unable to register with external shuffle server due to : " +
e.getMessage, e)
case NonFatal(e) => throw SparkCoreErrors.unableToRegisterWithExternalShuffleServerError(e)
}
}
}
@ -656,7 +654,7 @@ private[spark] class BlockManager(
ThreadUtils.awaitReady(task, Duration.Inf)
} catch {
case NonFatal(t) =>
throw new Exception("Error occurred while waiting for async. reregistration", t)
throw SparkCoreErrors.waitingForAsyncReregistrationError(t)
}
}
}
@ -693,7 +691,7 @@ private[spark] class BlockManager(
// likely that the master has outdated block statuses for this block. Therefore, we send
// an RPC so that this block is marked as being unavailable from this block manager.
reportBlockStatus(blockId, BlockStatus.empty)
throw new BlockNotFoundException(blockId.toString)
throw SparkCoreErrors.blockNotFoundError(blockId)
}
}
}
@ -724,9 +722,9 @@ private[spark] class BlockManager(
try {
return migratableResolver.putShuffleBlockAsStream(blockId, serializerManager)
} catch {
case e: ClassCastException => throw new SparkException(
s"Unexpected shuffle block ${blockId} with unsupported shuffle " +
s"resolver ${shuffleManager.shuffleBlockResolver}")
case e: ClassCastException =>
throw SparkCoreErrors.unexpectedShuffleBlockWithUnsupportedResolverError(shuffleManager,
blockId)
}
}
logDebug(s"Putting regular block ${blockId}")
@ -754,7 +752,7 @@ private[spark] class BlockManager(
val blockStored = TempFileBasedBlockStoreUpdater(
blockId, level, classTag, tmpFile, blockSize).save()
if (!blockStored) {
throw new Exception(s"Failure while trying to store block $blockId on $blockManagerId.")
throw SparkCoreErrors.failToStoreBlockOnBlockManagerError(blockManagerId, blockId)
}
}
@ -896,7 +894,7 @@ private[spark] class BlockManager(
releaseLock(blockId)
// Remove the missing block so that its unavailability is reported to the driver
removeBlock(blockId)
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
throw SparkCoreErrors.readLockedBlockNotFoundError(blockId)
}
/**
@ -1301,7 +1299,7 @@ private[spark] class BlockManager(
// Since we held a read lock between the doPut() and get() calls, the block should not
// have been evicted, so get() not returning the block indicates some internal error.
releaseLock(blockId)
throw new SparkException(s"get() failed for block $blockId even though we held a lock")
throw SparkCoreErrors.failToGetBlockWithLockError(blockId)
}
// We already hold a read lock on the block from the doPut() call and getLocalValues()
// acquires the lock again, so we need to call releaseLock() here so that the net number

View file

@ -25,6 +25,7 @@ import scala.collection.mutable
import scala.util.control.NonFatal
import org.apache.spark._
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.shuffle.ShuffleBlockInfo
@ -90,7 +91,7 @@ private[storage] class BlockManagerDecommissioner(
case None => Thread.sleep(1000)
}
}
throw new InterruptedException()
throw SparkCoreErrors.interruptedError()
}
override def run(): Unit = {

View file

@ -21,7 +21,8 @@ import scala.collection.generic.CanBuildFrom
import scala.collection.immutable.Iterable
import scala.concurrent.Future
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.SparkConf
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.storage.BlockManagerMessages._
@ -240,7 +241,7 @@ class BlockManagerMaster(
val blockStatus = timeout.awaitResult(
Future.sequence(futures)(cbf, ThreadUtils.sameThread))
if (blockStatus == null) {
throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
throw SparkCoreErrors.blockStatusQueryReturnedNullError(blockId)
}
blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>
status.map { s => (blockManagerId, s) }
@ -280,7 +281,7 @@ class BlockManagerMaster(
/** Send a one-way message to the master endpoint, to which we expect it to reply with true. */
private def tell(message: Any): Unit = {
if (!driverEndpoint.askSync[Boolean](message)) {
throw new SparkException("BlockManagerMasterEndpoint returned false, expected true.")
throw SparkCoreErrors.unexpectedBlockManagerMasterEndpointResultError()
}
}

View file

@ -17,5 +17,5 @@
package org.apache.spark.storage
private[storage] class BlockSavedOnDecommissionedBlockManagerException(blockId: BlockId)
class BlockSavedOnDecommissionedBlockManagerException(blockId: BlockId)
extends Exception(s"Block $blockId cannot be saved on decommissioned executor")

View file

@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.SparkConf
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.network.shuffle.ExecutorDiskUtils
@ -251,9 +252,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Bo
while (created == null) {
attempts += 1
if (attempts > maxAttempts) {
throw new IOException(
s"Failed to create directory ${dirToCreate.getAbsolutePath} with permission " +
s"770 after $maxAttempts attempts!")
throw SparkCoreErrors.failToCreateDirectoryError(dirToCreate.getAbsolutePath, maxAttempts)
}
try {
val builder = new ProcessBuilder().command(

View file

@ -21,6 +21,7 @@ import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream}
import java.nio.channels.{ClosedByInterruptException, FileChannel}
import java.util.zip.Checksum
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.Logging
import org.apache.spark.io.MutableCheckedOutputStream
import org.apache.spark.serializer.{SerializationStream, SerializerInstance, SerializerManager}
@ -275,7 +276,7 @@ private[spark] class DiskBlockObjectWriter(
recordWritten()
}
override def write(b: Int): Unit = throw new UnsupportedOperationException()
override def write(b: Int): Unit = throw SparkCoreErrors.unsupportedOperationError()
override def write(kvBytes: Array[Byte], offs: Int, len: Int): Unit = {
if (!streamOpen) {

View file

@ -32,14 +32,15 @@ import io.netty.util.internal.OutOfDirectMemoryError
import org.apache.commons.io.IOUtils
import org.roaringbitmap.RoaringBitmap
import org.apache.spark.{MapOutputTracker, SparkException, TaskContext}
import org.apache.spark.{MapOutputTracker, TaskContext}
import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.Logging
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.shuffle._
import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper}
import org.apache.spark.network.util.{NettyUtils, TransportConf}
import org.apache.spark.shuffle.{FetchFailedException, ShuffleReadMetricsReporter}
import org.apache.spark.shuffle.ShuffleReadMetricsReporter
import org.apache.spark.util.{CompletionIterator, TaskCompletionListener, Utils}
/**
@ -733,7 +734,7 @@ final class ShuffleBlockFetcherIterator(
*/
override def next(): (BlockId, InputStream) = {
if (!hasNext) {
throw new NoSuchElementException()
throw SparkCoreErrors.noSuchElementError()
}
numBlocksProcessed += 1
@ -1153,12 +1154,11 @@ final class ShuffleBlockFetcherIterator(
val msg = message.getOrElse(e.getMessage)
blockId match {
case ShuffleBlockId(shufId, mapId, reduceId) =>
throw new FetchFailedException(address, shufId, mapId, mapIndex, reduceId, msg, e)
throw SparkCoreErrors.fetchFailedError(address, shufId, mapId, mapIndex, reduceId, msg, e)
case ShuffleBlockBatchId(shuffleId, mapId, startReduceId, _) =>
throw new FetchFailedException(address, shuffleId, mapId, mapIndex, startReduceId, msg, e)
case _ =>
throw new SparkException(
"Failed to get block " + blockId + ", which is not a shuffle block", e)
throw SparkCoreErrors.fetchFailedError(address, shuffleId, mapId, mapIndex, startReduceId,
msg, e)
case _ => throw SparkCoreErrors.failToGetNonShuffleBlockError(blockId, e)
}
}