[SPARK-35056][SQL] Group exception messages in execution/streaming

### What changes were proposed in this pull request?
This PR group exception messages in `sql/core/src/main/scala/org/apache/spark/sql/execution/streaming`.

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #32880 from beliefer/SPARK-35056.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
gengjiaan 2021-06-15 12:19:52 +03:00 committed by Max Gekk
parent 195090afcc
commit b191d720e1
9 changed files with 121 additions and 47 deletions

View file

@ -26,7 +26,7 @@ import java.time.temporal.ChronoField
import java.util.ConcurrentModificationException
import com.fasterxml.jackson.core.JsonToken
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.{FileAlreadyExistsException, FileStatus, Path}
import org.codehaus.commons.compiler.{CompileException, InternalCompilerException}
import org.apache.spark.{Partition, SparkException, SparkUpgradeException}
@ -48,6 +48,7 @@ import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.types.UTF8String
@ -1329,4 +1330,96 @@ object QueryExecutionErrors {
def illegalLocationClauseForViewPartitionError(): Throwable = {
new SparkException("LOCATION clause illegal for view partition")
}
def renamePathAsExistsPathError(srcPath: Path, dstPath: Path): Throwable = {
new FileAlreadyExistsException(
s"Failed to rename $srcPath to $dstPath as destination already exists")
}
def renameAsExistsPathError(dstPath: Path): Throwable = {
new FileAlreadyExistsException(s"Failed to rename as $dstPath already exists")
}
def renameSrcPathNotFoundError(srcPath: Path): Throwable = {
new FileNotFoundException(s"Failed to rename as $srcPath was not found")
}
def failedRenameTempFileError(srcPath: Path, dstPath: Path): Throwable = {
new IOException(s"Failed to rename temp file $srcPath to $dstPath as rename returned false")
}
def legacyMetadataPathExistsError(metadataPath: Path, legacyMetadataPath: Path): Throwable = {
new SparkException(
s"""
|Error: we detected a possible problem with the location of your "_spark_metadata"
|directory and you likely need to move it before restarting this query.
|
|Earlier version of Spark incorrectly escaped paths when writing out the
|"_spark_metadata" directory for structured streaming. While this was corrected in
|Spark 3.0, it appears that your query was started using an earlier version that
|incorrectly handled the "_spark_metadata" path.
|
|Correct "_spark_metadata" Directory: $metadataPath
|Incorrect "_spark_metadata" Directory: $legacyMetadataPath
|
|Please move the data from the incorrect directory to the correct one, delete the
|incorrect directory, and then restart this query. If you believe you are receiving
|this message in error, you can disable it with the SQL conf
|${SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key}.
""".stripMargin)
}
def partitionColumnNotFoundInSchemaError(col: String, schema: StructType): Throwable = {
new RuntimeException(s"Partition column $col not found in schema $schema")
}
def stateNotDefinedOrAlreadyRemovedError(): Throwable = {
new NoSuchElementException("State is either not defined or has already been removed")
}
def cannotSetTimeoutDurationError(): Throwable = {
new UnsupportedOperationException(
"Cannot set timeout duration without enabling processing time timeout in " +
"[map|flatMap]GroupsWithState")
}
def cannotGetEventTimeWatermarkError(): Throwable = {
new UnsupportedOperationException(
"Cannot get event time watermark timestamp without setting watermark before " +
"[map|flatMap]GroupsWithState")
}
def cannotSetTimeoutTimestampError(): Throwable = {
new UnsupportedOperationException(
"Cannot set timeout timestamp without enabling event time timeout in " +
"[map|flatMapGroupsWithState")
}
def batchMetadataFileNotFoundError(batchMetadataFile: Path): Throwable = {
new FileNotFoundException(s"Unable to find batch $batchMetadataFile")
}
def multiStreamingQueriesUsingPathConcurrentlyError(
path: String, e: FileAlreadyExistsException): Throwable = {
new ConcurrentModificationException(
s"Multiple streaming queries are concurrently using $path", e)
}
def addFilesWithAbsolutePathUnsupportedError(commitProtocol: String): Throwable = {
new UnsupportedOperationException(
s"$commitProtocol does not support adding files with an absolute path")
}
def microBatchUnsupportedByDataSourceError(srcName: String): Throwable = {
new UnsupportedOperationException(
s"Data source $srcName does not support microbatch processing.")
}
def cannotExecuteStreamingRelationExecError(): Throwable = {
new UnsupportedOperationException("StreamingRelationExec cannot be executed")
}
def invalidStreamingOutputModeError(outputMode: Option[OutputMode]): Throwable = {
new UnsupportedOperationException(s"Invalid output mode: $outputMode")
}
}

View file

@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.streaming
import java.io.{FileNotFoundException, IOException, OutputStream}
import java.io.{FileNotFoundException, OutputStream}
import java.util.{EnumSet, UUID}
import scala.util.control.NonFatal
@ -27,6 +27,7 @@ import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.spark.internal.Logging
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
@ -257,8 +258,7 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
if (!overwriteIfPossible && fs.exists(dstPath)) {
throw new FileAlreadyExistsException(
s"Failed to rename $srcPath to $dstPath as destination already exists")
throw QueryExecutionErrors.renamePathAsExistsPathError(srcPath, dstPath)
}
if (!fs.rename(srcPath, dstPath)) {
@ -266,14 +266,14 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
// This tries to make a best effort attempt to return the most appropriate exception.
if (fs.exists(dstPath)) {
if (!overwriteIfPossible) {
throw new FileAlreadyExistsException(s"Failed to rename as $dstPath already exists")
throw QueryExecutionErrors.renameAsExistsPathError(dstPath)
}
} else if (!fs.exists(srcPath)) {
throw new FileNotFoundException(s"Failed to rename as $srcPath was not found")
throw QueryExecutionErrors.renameSrcPathNotFoundError(srcPath)
} else {
val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false"
logWarning(msg)
throw new IOException(msg)
val e = QueryExecutionErrors.failedRenameTempFileError(srcPath, dstPath)
logWarning(e.getMessage)
throw e
}
}
}

View file

@ -27,6 +27,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormat, FileFormatWriter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{SerializableConfiguration, Utils}
@ -82,23 +83,7 @@ object FileStreamSink extends Logging {
false
}
if (legacyMetadataPathExists) {
throw new SparkException(
s"""Error: we detected a possible problem with the location of your "_spark_metadata"
|directory and you likely need to move it before restarting this query.
|
|Earlier version of Spark incorrectly escaped paths when writing out the
|"_spark_metadata" directory for structured streaming. While this was corrected in
|Spark 3.0, it appears that your query was started using an earlier version that
|incorrectly handled the "_spark_metadata" path.
|
|Correct "_spark_metadata" Directory: $metadataPath
|Incorrect "_spark_metadata" Directory: $legacyMetadataPath
|
|Please move the data from the incorrect directory to the correct one, delete the
|incorrect directory, and then restart this query. If you believe you are receiving
|this message in error, you can disable it with the SQL conf
|${SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key}."""
.stripMargin)
throw QueryExecutionErrors.legacyMetadataPathExistsError(metadataPath, legacyMetadataPath)
}
}
}
@ -173,7 +158,7 @@ class FileStreamSink(
val partitionColumns: Seq[Attribute] = partitionColumnNames.map { col =>
val nameEquality = data.sparkSession.sessionState.conf.resolver
data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse {
throw new RuntimeException(s"Partition column $col not found in schema ${data.schema}")
throw QueryExecutionErrors.partitionColumnNotFoundInSchemaError(col, data.schema)
}
}
val qe = data.queryExecution

View file

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout}
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.streaming.GroupStateImpl._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
import org.apache.spark.unsafe.types.UTF8String
@ -59,7 +60,7 @@ private[sql] class GroupStateImpl[S] private(
if (defined) {
value
} else {
throw new NoSuchElementException("State is either not defined or has already been removed")
throw QueryExecutionErrors.stateNotDefinedOrAlreadyRemovedError()
}
}
@ -89,9 +90,7 @@ private[sql] class GroupStateImpl[S] private(
override def setTimeoutDuration(durationMs: Long): Unit = {
if (timeoutConf != ProcessingTimeTimeout) {
throw new UnsupportedOperationException(
"Cannot set timeout duration without enabling processing time timeout in " +
"[map|flatMap]GroupsWithState")
throw QueryExecutionErrors.cannotSetTimeoutDurationError()
}
if (durationMs <= 0) {
throw new IllegalArgumentException("Timeout duration must be positive")
@ -133,9 +132,7 @@ private[sql] class GroupStateImpl[S] private(
override def getCurrentWatermarkMs(): Long = {
if (!watermarkPresent) {
throw new UnsupportedOperationException(
"Cannot get event time watermark timestamp without setting watermark before " +
"[map|flatMap]GroupsWithState")
throw QueryExecutionErrors.cannotGetEventTimeWatermarkError()
}
eventTimeWatermarkMs
}
@ -170,9 +167,7 @@ private[sql] class GroupStateImpl[S] private(
private def checkTimeoutTimestampAllowed(): Unit = {
if (timeoutConf != EventTimeTimeout) {
throw new UnsupportedOperationException(
"Cannot set timeout timestamp without enabling event time timeout in " +
"[map|flatMapGroupsWithState")
throw QueryExecutionErrors.cannotSetTimeoutTimestampError()
}
}
}

View file

@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.streaming
import java.io._
import java.nio.charset.StandardCharsets
import java.util.ConcurrentModificationException
import scala.reflect.ClassTag
@ -30,6 +29,7 @@ import org.json4s.jackson.Serialization
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.errors.QueryExecutionErrors
/**
@ -150,7 +150,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
IOUtils.closeQuietly(input)
}
} else {
throw new FileNotFoundException(s"Unable to find batch $batchMetadataFile")
throw QueryExecutionErrors.batchMetadataFileNotFoundError(batchMetadataFile)
}
}
@ -179,8 +179,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
output.cancel()
// If next batch file already exists, then another concurrently running query has
// written it.
throw new ConcurrentModificationException(
s"Multiple streaming queries are concurrently using $path", e)
throw QueryExecutionErrors.multiStreamingQueriesUsingPathConcurrentlyError(path, e)
case e: Throwable =>
output.cancel()
throw e

View file

@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.sql.errors.QueryExecutionErrors
/**
* A [[FileCommitProtocol]] that tracks the list of valid files in a manifest file, used in
@ -131,8 +132,7 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
override def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
throw new UnsupportedOperationException(
s"$this does not support adding files with an absolute path")
throw QueryExecutionErrors.addFilesWithAbsolutePathUnsupportedError(this.toString)
}
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {

View file

@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, TableCapability}
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, ReadLimit, SparkDataStream, SupportsAdmissionControl}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource
@ -101,8 +102,7 @@ class MicroBatchExecution(
StreamingDataSourceV2Relation(output, scan, stream)
})
} else if (v1.isEmpty) {
throw new UnsupportedOperationException(
s"Data source $srcName does not support microbatch processing.")
throw QueryExecutionErrors.microBatchUnsupportedByDataSourceError(srcName)
} else {
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch

View file

@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.DataSource
@ -88,7 +89,7 @@ case class StreamingExecutionRelation(
case class StreamingRelationExec(sourceName: String, output: Seq[Attribute]) extends LeafExecNode {
override def toString: String = sourceName
override protected def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException("StreamingRelationExec cannot be executed")
throw QueryExecutionErrors.cannotExecuteStreamingRelationExecError()
}
}

View file

@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.streaming.state._
@ -418,7 +419,7 @@ case class StateStoreSaveExec(
}
}
case _ => throw new UnsupportedOperationException(s"Invalid output mode: $outputMode")
case _ => throw QueryExecutionErrors.invalidStreamingOutputModeError(outputMode)
}
}
}