[SPARK-33602][SQL] Group exception messages in execution/datasources

### What changes were proposed in this pull request?
This PR group exception messages in `/core/src/main/scala/org/apache/spark/sql/execution/datasources`.

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #31757 from beliefer/SPARK-33602.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
gengjiaan 2021-03-17 14:04:02 +00:00 committed by Wenchen Fan
parent 9f7b0a035b
commit 569fb133d0
22 changed files with 737 additions and 265 deletions

View file

@ -22,15 +22,17 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable, ResolvedView}
import org.apache.spark.sql.catalyst.catalog.InvalidUDFClassException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, InvalidUDFClassException}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateMap, Expression, GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction, WindowSpecDefinition}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SerdeInfo}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan, SerdeInfo}
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.{toPrettySQL, FailFastMode, ParseMode, PermissiveMode}
import org.apache.spark.sql.connector.catalog.{TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{AbstractDataType, DataType, StructType}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{AbstractDataType, DataType, StructField, StructType}
/**
* Object for grouping all error messages of the query compilation.
@ -754,4 +756,335 @@ private[spark] object QueryCompilationErrors {
new AnalysisException(s"Invalid partitionExprs specified: $sortOrders For range " +
"partitioning use REPARTITION_BY_RANGE instead.")
}
def partitionColumnNotSpecifiedError(format: String, partitionColumn: String): Throwable = {
new AnalysisException(s"Failed to resolve the schema for $format for " +
s"the partition column: $partitionColumn. It must be specified manually.")
}
def dataSchemaNotSpecifiedError(format: String): Throwable = {
new AnalysisException(s"Unable to infer schema for $format. It must be specified manually.")
}
def dataPathNotExistError(path: String): Throwable = {
new AnalysisException(s"Path does not exist: $path")
}
def dataSourceOutputModeUnsupportedError(
className: String, outputMode: OutputMode): Throwable = {
new AnalysisException(s"Data source $className does not support $outputMode output mode")
}
def schemaNotSpecifiedForSchemaRelationProviderError(className: String): Throwable = {
new AnalysisException(s"A schema needs to be specified when using $className.")
}
def userSpecifiedSchemaMismatchActualSchemaError(
schema: StructType, actualSchema: StructType): Throwable = {
new AnalysisException(
s"""
|The user-specified schema doesn't match the actual schema:
|user-specified: ${schema.toDDL}, actual: ${actualSchema.toDDL}. If you're using
|DataFrameReader.schema API or creating a table, please do not specify the schema.
|Or if you're scanning an existed table, please drop it and re-create it.
""".stripMargin)
}
def dataSchemaNotSpecifiedError(format: String, fileCatalog: String): Throwable = {
new AnalysisException(
s"Unable to infer schema for $format at $fileCatalog. It must be specified manually")
}
def invalidDataSourceError(className: String): Throwable = {
new AnalysisException(s"$className is not a valid Spark SQL Data Source.")
}
def cannotSaveIntervalIntoExternalStorageError(): Throwable = {
new AnalysisException("Cannot save interval data type into external storage.")
}
def cannotResolveAttributeError(name: String, data: LogicalPlan): Throwable = {
new AnalysisException(
s"Unable to resolve $name given [${data.output.map(_.name).mkString(", ")}]")
}
def orcNotUsedWithHiveEnabledError(): Throwable = {
new AnalysisException(
s"""
|Hive built-in ORC data source must be used with Hive support enabled.
|Please use the native ORC data source by setting 'spark.sql.orc.impl' to 'native'
""".stripMargin)
}
def failedToFindAvroDataSourceError(provider: String): Throwable = {
new AnalysisException(
s"""
|Failed to find data source: $provider. Avro is built-in but external data
|source module since Spark 2.4. Please deploy the application as per
|the deployment section of "Apache Avro Data Source Guide".
""".stripMargin.replaceAll("\n", " "))
}
def failedToFindKafkaDataSourceError(provider: String): Throwable = {
new AnalysisException(
s"""
|Failed to find data source: $provider. Please deploy the application as
|per the deployment section of "Structured Streaming + Kafka Integration Guide".
""".stripMargin.replaceAll("\n", " "))
}
def findMultipleDataSourceError(provider: String, sourceNames: Seq[String]): Throwable = {
new AnalysisException(
s"""
|Multiple sources found for $provider (${sourceNames.mkString(", ")}),
| please specify the fully qualified class name.
""".stripMargin)
}
def writeEmptySchemasUnsupportedByDataSourceError(): Throwable = {
new AnalysisException(
s"""
|Datasource does not support writing empty or nested empty schemas.
|Please make sure the data schema has at least one or more column(s).
""".stripMargin)
}
def insertMismatchedColumnNumberError(
targetAttributes: Seq[Attribute],
sourceAttributes: Seq[Attribute],
staticPartitionsSize: Int): Throwable = {
new AnalysisException(
s"""
|The data to be inserted needs to have the same number of columns as the
|target table: target table has ${targetAttributes.size} column(s) but the
|inserted data has ${sourceAttributes.size + staticPartitionsSize} column(s),
|which contain $staticPartitionsSize partition column(s) having assigned
|constant values.
""".stripMargin)
}
def insertMismatchedPartitionNumberError(
targetPartitionSchema: StructType,
providedPartitionsSize: Int): Throwable = {
new AnalysisException(
s"""
|The data to be inserted needs to have the same number of partition columns
|as the target table: target table has ${targetPartitionSchema.fields.size}
|partition column(s) but the inserted data has $providedPartitionsSize
|partition columns specified.
""".stripMargin.replaceAll("\n", " "))
}
def invalidPartitionColumnError(
partKey: String, targetPartitionSchema: StructType): Throwable = {
new AnalysisException(
s"""
|$partKey is not a partition column. Partition columns are
|${targetPartitionSchema.fields.map(_.name).mkString("[", ",", "]")}
""".stripMargin)
}
def multiplePartitionColumnValuesSpecifiedError(
field: StructField, potentialSpecs: Map[String, String]): Throwable = {
new AnalysisException(
s"""
|Partition column ${field.name} have multiple values specified,
|${potentialSpecs.mkString("[", ", ", "]")}. Please only specify a single value.
""".stripMargin)
}
def invalidOrderingForConstantValuePartitionColumnError(
targetPartitionSchema: StructType): Throwable = {
new AnalysisException(
s"""
|The ordering of partition columns is
|${targetPartitionSchema.fields.map(_.name).mkString("[", ",", "]")}
|All partition columns having constant values need to appear before other
|partition columns that do not have an assigned constant value.
""".stripMargin)
}
def cannotWriteDataToRelationsWithMultiplePathsError(): Throwable = {
new AnalysisException("Can only write data to relations with a single path.")
}
def failedToRebuildExpressionError(filter: Filter): Throwable = {
new AnalysisException(
s"Fail to rebuild expression: missing key $filter in `translatedFilterToExpr`")
}
def dataTypeUnsupportedByDataSourceError(format: String, field: StructField): Throwable = {
new AnalysisException(
s"$format data source does not support ${field.dataType.catalogString} data type.")
}
def failToResolveDataSourceForTableError(table: CatalogTable, key: String): Throwable = {
new AnalysisException(
s"""
|Fail to resolve data source for the table ${table.identifier} since the table
|serde property has the duplicated key $key with extra options specified for this
|scan operation. To fix this, you can rollback to the legacy behavior of ignoring
|the extra options by setting the config
|${SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key} to `false`, or address the
|conflicts of the same config.
""".stripMargin)
}
def outputPathAlreadyExistsError(outputPath: Path): Throwable = {
new AnalysisException(s"path $outputPath already exists.")
}
def cannotUseDataTypeForPartitionColumnError(field: StructField): Throwable = {
new AnalysisException(s"Cannot use ${field.dataType} for partition column")
}
def cannotUseAllColumnsForPartitionColumnsError(): Throwable = {
new AnalysisException(s"Cannot use all columns for partition columns")
}
def partitionColumnNotFoundInSchemaError(col: String, schemaCatalog: String): Throwable = {
new AnalysisException(s"Partition column `$col` not found in schema $schemaCatalog")
}
def columnNotFoundInSchemaError(
col: StructField, tableSchema: Option[StructType]): Throwable = {
new AnalysisException(s"""Column "${col.name}" not found in schema $tableSchema""")
}
def unsupportedDataSourceTypeForDirectQueryOnFilesError(className: String): Throwable = {
new AnalysisException(s"Unsupported data source type for direct query on files: $className")
}
def saveDataIntoViewNotAllowedError(): Throwable = {
new AnalysisException("Saving data into a view is not allowed.")
}
def mismatchedTableFormatError(
tableName: String, existingProvider: Class[_], specifiedProvider: Class[_]): Throwable = {
new AnalysisException(
s"""
|The format of the existing table $tableName is `${existingProvider.getSimpleName}`.
|It doesn't match the specified format `${specifiedProvider.getSimpleName}`.
""".stripMargin)
}
def mismatchedTableLocationError(
identifier: TableIdentifier,
existingTable: CatalogTable,
tableDesc: CatalogTable): Throwable = {
new AnalysisException(
s"""
|The location of the existing table ${identifier.quotedString} is
|`${existingTable.location}`. It doesn't match the specified location
|`${tableDesc.location}`.
""".stripMargin)
}
def mismatchedTableColumnNumberError(
tableName: String,
existingTable: CatalogTable,
query: LogicalPlan): Throwable = {
new AnalysisException(
s"""
|The column number of the existing table $tableName
|(${existingTable.schema.catalogString}) doesn't match the data schema
|(${query.schema.catalogString})
""".stripMargin)
}
def cannotResolveColumnGivenInputColumnsError(col: String, inputColumns: String): Throwable = {
new AnalysisException(s"cannot resolve '$col' given input columns: [$inputColumns]")
}
def mismatchedTablePartitionColumnError(
tableName: String,
specifiedPartCols: Seq[String],
existingPartCols: String): Throwable = {
new AnalysisException(
s"""
|Specified partitioning does not match that of the existing table $tableName.
|Specified partition columns: [${specifiedPartCols.mkString(", ")}]
|Existing partition columns: [$existingPartCols]
""".stripMargin)
}
def mismatchedTableBucketingError(
tableName: String,
specifiedBucketString: String,
existingBucketString: String): Throwable = {
new AnalysisException(
s"""
|Specified bucketing does not match that of the existing table $tableName.
|Specified bucketing: $specifiedBucketString
|Existing bucketing: $existingBucketString
""".stripMargin)
}
def specifyPartitionNotAllowedWhenTableSchemaNotDefinedError(): Throwable = {
new AnalysisException("It is not allowed to specify partitioning when the " +
"table schema is not defined.")
}
def bucketingColumnCannotBePartOfPartitionColumnsError(
bucketCol: String, normalizedPartCols: Seq[String]): Throwable = {
new AnalysisException(s"bucketing column '$bucketCol' should not be part of " +
s"partition columns '${normalizedPartCols.mkString(", ")}'")
}
def bucketSortingColumnCannotBePartOfPartitionColumnsError(
sortCol: String, normalizedPartCols: Seq[String]): Throwable = {
new AnalysisException(s"bucket sorting column '$sortCol' should not be part of " +
s"partition columns '${normalizedPartCols.mkString(", ")}'")
}
def mismatchedInsertedDataColumnNumberError(
tableName: String, insert: InsertIntoStatement, staticPartCols: Set[String]): Throwable = {
new AnalysisException(
s"$tableName requires that the data to be inserted have the same number of columns as " +
s"the target table: target table has ${insert.table.output.size} column(s) but the " +
s"inserted data has ${insert.query.output.length + staticPartCols.size} column(s), " +
s"including ${staticPartCols.size} partition column(s) having constant value(s).")
}
def requestedPartitionsMismatchTablePartitionsError(
tableName: String,
normalizedPartSpec: Map[String, Option[String]],
partColNames: StructType): Throwable = {
new AnalysisException(
s"""
|Requested partitioning does not match the table $tableName:
|Requested partitions: ${normalizedPartSpec.keys.mkString(",")}
|Table partitions: ${partColNames.mkString(",")}
""".stripMargin)
}
def ddlWithoutHiveSupportEnabledError(detail: String): Throwable = {
new AnalysisException(s"Hive support is required to $detail")
}
def createTableColumnTypesOptionColumnNotFoundInSchemaError(
col: String, schema: StructType): Throwable = {
new AnalysisException(
s"createTableColumnTypes option column $col not found in schema ${schema.catalogString}")
}
def parquetTypeUnsupportedError(parquetType: String): Throwable = {
new AnalysisException(s"Parquet type not supported: $parquetType")
}
def parquetTypeUnsupportedYetError(parquetType: String): Throwable = {
new AnalysisException(s"Parquet type not yet supported: $parquetType")
}
def illegalParquetTypeError(parquetType: String): Throwable = {
new AnalysisException(s"Illegal Parquet type: $parquetType")
}
def unrecognizedParquetTypeError(field: String): Throwable = {
new AnalysisException(s"Unrecognized Parquet type: $field")
}
def cannotConvertDataTypeToParquetTypeError(field: StructField): Throwable = {
new AnalysisException(s"Unsupported data type ${field.dataType.catalogString}")
}
}

View file

@ -17,19 +17,20 @@
package org.apache.spark.sql.errors
import java.io.IOException
import java.io.{FileNotFoundException, IOException}
import java.net.URISyntaxException
import java.sql.{SQLException, SQLFeatureNotSupportedException}
import java.time.DateTimeException
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path}
import org.codehaus.commons.compiler.CompileException
import org.codehaus.janino.InternalCompilerException
import org.apache.spark.SparkException
import org.apache.spark.{SparkException, SparkUpgradeException}
import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
import org.apache.spark.sql.catalyst.expressions.{Expression, UnevaluableAggregate}
import org.apache.spark.sql.types.{DataType, Decimal}
import org.apache.spark.sql.types.{DataType, Decimal, StructType}
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.types.UTF8String
@ -190,6 +191,10 @@ object QueryExecutionErrors {
new UnsupportedOperationException(s"Unexpected data type ${dataType.catalogString}")
}
def typeUnsupportedError(dataType: DataType): Throwable = {
new IllegalArgumentException(s"Unexpected type $dataType")
}
def negativeValueUnexpectedError(frequencyExpression : Expression): Throwable = {
new SparkException(s"Negative values found in ${frequencyExpression.sql}")
}
@ -322,4 +327,233 @@ object QueryExecutionErrors {
def compilerError(e: CompileException): Throwable = {
new CompileException(failedToCompileMsg(e), e.getLocation)
}
def dataPathNotSpecifiedError(): Throwable = {
new IllegalArgumentException("'path' is not specified")
}
def createStreamingSourceNotSpecifySchemaError(): Throwable = {
new IllegalArgumentException(
s"""
|Schema must be specified when creating a streaming source DataFrame. If some
|files already exist in the directory, then depending on the file format you
|may be able to create a static DataFrame on that directory with
|'spark.read.load(directory)' and infer schema from it.
""".stripMargin)
}
def streamedOperatorUnsupportedByDataSourceError(
className: String, operator: String): Throwable = {
new UnsupportedOperationException(
s"Data source $className does not support streamed $operator")
}
def multiplePathsSpecifiedError(allPaths: Seq[String]): Throwable = {
new IllegalArgumentException("Expected exactly one path to be specified, but " +
s"got: ${allPaths.mkString(", ")}")
}
def failedToFindDataSourceError(provider: String, error: Throwable): Throwable = {
new ClassNotFoundException(
s"""
|Failed to find data source: $provider. Please find packages at
|http://spark.apache.org/third-party-projects.html
""".stripMargin, error)
}
def removedClassInSpark2Error(className: String, e: Throwable): Throwable = {
new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
"Please check if your library is compatible with Spark 2.0", e)
}
def incompatibleDataSourceRegisterError(e: Throwable): Throwable = {
new ClassNotFoundException(
s"""
|Detected an incompatible DataSourceRegister. Please remove the incompatible
|library from classpath or upgrade it. Error: ${e.getMessage}
""".stripMargin, e)
}
def unrecognizedFileFormatError(format: String): Throwable = {
new IllegalStateException(s"unrecognized format $format")
}
def sparkUpgradeInReadingDatesError(
format: String, config: String, option: String): SparkUpgradeException = {
new SparkUpgradeException("3.0",
s"""
|reading dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from $format
|files can be ambiguous, as the files may be written by Spark 2.x or legacy versions of
|Hive, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic
|Gregorian calendar. See more details in SPARK-31404. You can set the SQL config
|'$config' or the datasource option '$option' to 'LEGACY' to rebase the datetime values
|w.r.t. the calendar difference during reading. To read the datetime values as it is,
|set the SQL config '$config' or the datasource option '$option' to 'CORRECTED'.
""".stripMargin, null)
}
def sparkUpgradeInWritingDatesError(format: String, config: String): SparkUpgradeException = {
new SparkUpgradeException("3.0",
s"""
|writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z into $format
|files can be dangerous, as the files may be read by Spark 2.x or legacy versions of Hive
|later, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic
|Gregorian calendar. See more details in SPARK-31404. You can set $config to 'LEGACY' to
|rebase the datetime values w.r.t. the calendar difference during writing, to get maximum
|interoperability. Or set $config to 'CORRECTED' to write the datetime values as it is,
|if you are 100% sure that the written files will only be read by Spark 3.0+ or other
|systems that use Proleptic Gregorian calendar.
""".stripMargin, null)
}
def buildReaderUnsupportedForFileFormatError(format: String): Throwable = {
new UnsupportedOperationException(s"buildReader is not supported for $format")
}
def jobAbortedError(cause: Throwable): Throwable = {
new SparkException("Job aborted.", cause)
}
def taskFailedWhileWritingRowsError(cause: Throwable): Throwable = {
new SparkException("Task failed while writing rows.", cause)
}
def readCurrentFileNotFoundError(e: FileNotFoundException): Throwable = {
new FileNotFoundException(
s"""
|${e.getMessage}\n
|It is possible the underlying files have been updated. You can explicitly invalidate
|the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
|recreating the Dataset/DataFrame involved.
""".stripMargin)
}
def unsupportedSaveModeError(saveMode: String, pathExists: Boolean): Throwable = {
new IllegalStateException(s"unsupported save mode $saveMode ($pathExists)")
}
def cannotClearOutputDirectoryError(staticPrefixPath: Path): Throwable = {
new IOException(s"Unable to clear output directory $staticPrefixPath prior to writing to it")
}
def cannotClearPartitionDirectoryError(path: Path): Throwable = {
new IOException(s"Unable to clear partition directory $path prior to writing to it")
}
def failedToCastValueToDataTypeForPartitionColumnError(
value: String, dataType: DataType, columnName: String): Throwable = {
new RuntimeException(s"Failed to cast value `$value` to " +
s"`$dataType` for partition column `$columnName`")
}
def endOfStreamError(): Throwable = {
new NoSuchElementException("End of stream")
}
def writeUnsupportedForBinaryFileDataSourceError(): Throwable = {
new UnsupportedOperationException("Write is not supported for binary file data source")
}
def fileLengthExceedsMaxLengthError(status: FileStatus, maxLength: Int): Throwable = {
new SparkException(
s"The length of ${status.getPath} is ${status.getLen}, " +
s"which exceeds the max length allowed: ${maxLength}.")
}
def unsupportedFieldNameError(fieldName: String): Throwable = {
new RuntimeException(s"Unsupported field name: ${fieldName}")
}
def cannotSpecifyBothJdbcTableNameAndQueryError(
jdbcTableName: String, jdbcQueryString: String): Throwable = {
new IllegalArgumentException(
s"Both '$jdbcTableName' and '$jdbcQueryString' can not be specified at the same time.")
}
def missingJdbcTableNameAndQueryError(
jdbcTableName: String, jdbcQueryString: String): Throwable = {
new IllegalArgumentException(
s"Option '$jdbcTableName' or '$jdbcQueryString' is required."
)
}
def emptyOptionError(optionName: String): Throwable = {
new IllegalArgumentException(s"Option `$optionName` can not be empty.")
}
def invalidJdbcTxnIsolationLevelError(jdbcTxnIsolationLevel: String, value: String): Throwable = {
new IllegalArgumentException(
s"Invalid value `$value` for parameter `$jdbcTxnIsolationLevel`. This can be " +
"`NONE`, `READ_UNCOMMITTED`, `READ_COMMITTED`, `REPEATABLE_READ` or `SERIALIZABLE`.")
}
def cannotGetJdbcTypeError(dt: DataType): Throwable = {
new IllegalArgumentException(s"Can't get JDBC type for ${dt.catalogString}")
}
def unrecognizedSqlTypeError(sqlType: Int): Throwable = {
new SQLException(s"Unrecognized SQL type $sqlType")
}
def unsupportedJdbcTypeError(content: String): Throwable = {
new SQLException(s"Unsupported type $content")
}
def unsupportedArrayElementTypeBasedOnBinaryError(dt: DataType): Throwable = {
new IllegalArgumentException(s"Unsupported array element " +
s"type ${dt.catalogString} based on binary")
}
def nestedArraysUnsupportedError(): Throwable = {
new IllegalArgumentException("Nested arrays unsupported")
}
def cannotTranslateNonNullValueForFieldError(pos: Int): Throwable = {
new IllegalArgumentException(s"Can't translate non-null value for field $pos")
}
def invalidJdbcNumPartitionsError(n: Int, jdbcNumPartitions: String): Throwable = {
new IllegalArgumentException(
s"Invalid value `$n` for parameter `$jdbcNumPartitions` in table writing " +
"via JDBC. The minimum value is 1.")
}
def transactionUnsupportedByJdbcServerError(): Throwable = {
new SQLFeatureNotSupportedException("The target JDBC server does not support " +
"transaction and can only support ALTER TABLE with a single action.")
}
def dataTypeUnsupportedYetError(dataType: DataType): Throwable = {
new UnsupportedOperationException(s"$dataType is not supported yet.")
}
def unsupportedOperationForDataTypeError(dataType: DataType): Throwable = {
new UnsupportedOperationException(s"DataType: ${dataType.catalogString}")
}
def inputFilterNotFullyConvertibleError(owner: String): Throwable = {
new SparkException(s"The input filter of $owner should be fully convertible.")
}
def cannotReadFooterForFileError(file: Path, e: IOException): Throwable = {
new SparkException(s"Could not read footer for file: $file", e)
}
def cannotReadFooterForFileError(file: FileStatus, e: RuntimeException): Throwable = {
new IOException(s"Could not read footer for file: $file", e)
}
def foundDuplicateFieldInCaseInsensitiveModeError(
requiredFieldName: String, matchedOrcFields: String): Throwable = {
new RuntimeException(
s"""
|Found duplicate field(s) "$requiredFieldName": $matchedOrcFields
|in case-insensitive mode
""".stripMargin.replaceAll("\n", " "))
}
def failedToMergeIncompatibleSchemasError(
left: StructType, right: StructType, e: Throwable): Throwable = {
new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
}
}

View file

@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat,
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.TableProvider
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
@ -190,8 +191,8 @@ case class DataSource(
}
inferredOpt
}.getOrElse {
throw new AnalysisException(s"Failed to resolve the schema for $format for " +
s"the partition column: $partitionColumn. It must be specified manually.")
throw QueryCompilationErrors.partitionColumnNotSpecifiedError(
format.toString, partitionColumn)
}
}
StructType(partitionFields)
@ -208,8 +209,7 @@ case class DataSource(
caseInsensitiveOptions - "path",
tempFileIndex.allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format. It must be specified manually.")
throw QueryCompilationErrors.dataSchemaNotSpecifiedError(format.toString)
}
// We just print a warning message if the data schema and partition schema have the duplicate
@ -238,7 +238,7 @@ case class DataSource(
case format: FileFormat =>
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
throw QueryExecutionErrors.dataPathNotSpecifiedError()
})
// Check whether the path exists if it is not a glob pattern.
@ -248,7 +248,7 @@ case class DataSource(
if (!globPaths || !SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
val fs = hdfsPath.getFileSystem(newHadoopConfiguration())
if (!fs.exists(hdfsPath)) {
throw new AnalysisException(s"Path does not exist: $path")
throw QueryCompilationErrors.dataPathNotExistError(path)
}
}
@ -256,11 +256,7 @@ case class DataSource(
val isTextSource = providingClass == classOf[text.TextFileFormat]
// If the schema inference is disabled, only text sources require schema to be specified
if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) {
throw new IllegalArgumentException(
"Schema must be specified when creating a streaming source DataFrame. " +
"If some files already exist in the directory, then depending on the file format " +
"you may be able to create a static DataFrame on that directory with " +
"'spark.read.load(directory)' and infer schema from it.")
throw QueryExecutionErrors.createStreamingSourceNotSpecifySchemaError()
}
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, () => {
@ -280,8 +276,8 @@ case class DataSource(
partitionSchema.fieldNames)
case _ =>
throw new UnsupportedOperationException(
s"Data source $className does not support streamed reading")
throw QueryExecutionErrors.streamedOperatorUnsupportedByDataSourceError(
className, "reading")
}
}
@ -298,7 +294,7 @@ case class DataSource(
case format: FileFormat =>
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
throw QueryExecutionErrors.dataPathNotSpecifiedError()
})
new FileStreamSource(
sparkSession = sparkSession,
@ -309,8 +305,8 @@ case class DataSource(
metadataPath = metadataPath,
options = caseInsensitiveOptions)
case _ =>
throw new UnsupportedOperationException(
s"Data source $className does not support streamed reading")
throw QueryExecutionErrors.streamedOperatorUnsupportedByDataSourceError(
className, "reading")
}
}
@ -322,17 +318,16 @@ case class DataSource(
case fileFormat: FileFormat =>
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
throw QueryExecutionErrors.dataPathNotSpecifiedError()
})
if (outputMode != OutputMode.Append) {
throw new AnalysisException(
s"Data source $className does not support $outputMode output mode")
throw QueryCompilationErrors.dataSourceOutputModeUnsupportedError(className, outputMode)
}
new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, caseInsensitiveOptions)
case _ =>
throw new UnsupportedOperationException(
s"Data source $className does not support streamed writing")
throw QueryExecutionErrors.streamedOperatorUnsupportedByDataSourceError(
className, "writing")
}
}
@ -354,17 +349,13 @@ case class DataSource(
case (dataSource: RelationProvider, None) =>
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
case (_: SchemaRelationProvider, None) =>
throw new AnalysisException(s"A schema needs to be specified when using $className.")
throw QueryCompilationErrors.schemaNotSpecifiedForSchemaRelationProviderError(className)
case (dataSource: RelationProvider, Some(schema)) =>
val baseRelation =
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
if (baseRelation.schema != schema) {
throw new AnalysisException(
"The user-specified schema doesn't match the actual schema: " +
s"user-specified: ${schema.toDDL}, actual: ${baseRelation.schema.toDDL}. If " +
"you're using DataFrameReader.schema API or creating a table, please do not " +
"specify the schema. Or if you're scanning an existed table, please drop " +
"it and re-create it.")
throw QueryCompilationErrors.userSpecifiedSchemaMismatchActualSchemaError(
schema, baseRelation.schema)
}
baseRelation
@ -386,9 +377,8 @@ case class DataSource(
caseInsensitiveOptions - "path",
fileCatalog.allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " +
"It must be specified manually")
throw QueryCompilationErrors.dataSchemaNotSpecifiedError(
format.toString, fileCatalog.allFiles().mkString(","))
}
HadoopFsRelation(
@ -429,8 +419,7 @@ case class DataSource(
caseInsensitiveOptions)(sparkSession)
case _ =>
throw new AnalysisException(
s"$className is not a valid Spark SQL Data Source.")
throw QueryCompilationErrors.invalidDataSourceError(className)
}
relation match {
@ -470,8 +459,7 @@ case class DataSource(
val fs = path.getFileSystem(newHadoopConfiguration())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
s"got: ${allPaths.mkString(", ")}")
throw QueryExecutionErrors.multiplePathsSpecifiedError(allPaths)
}
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
@ -523,7 +511,7 @@ case class DataSource(
metrics: Map[String, SQLMetric]): BaseRelation = {
val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, outputColumnNames)
if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
throw QueryCompilationErrors.cannotSaveIntervalIntoExternalStorageError()
}
providingInstance() match {
@ -540,8 +528,7 @@ case class DataSource(
assert(unresolved.nameParts.length == 1)
val name = unresolved.nameParts.head
outputColumns.find(a => equality(a.name, name)).getOrElse {
throw new AnalysisException(
s"Unable to resolve $name given [${data.output.map(_.name).mkString(", ")}]")
throw QueryCompilationErrors.cannotResolveAttributeError(name, data)
}
}
val resolved = cmd.copy(
@ -561,7 +548,7 @@ case class DataSource(
*/
def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
throw QueryCompilationErrors.cannotSaveIntervalIntoExternalStorageError()
}
providingInstance() match {
@ -669,27 +656,15 @@ object DataSource extends Logging {
dataSource
case Failure(error) =>
if (provider1.startsWith("org.apache.spark.sql.hive.orc")) {
throw new AnalysisException(
"Hive built-in ORC data source must be used with Hive support enabled. " +
"Please use the native ORC data source by setting 'spark.sql.orc.impl' to " +
"'native'")
throw QueryCompilationErrors.orcNotUsedWithHiveEnabledError()
} else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
provider1 == "com.databricks.spark.avro" ||
provider1 == "org.apache.spark.sql.avro") {
throw new AnalysisException(
s"Failed to find data source: $provider1. Avro is built-in but external data " +
"source module since Spark 2.4. Please deploy the application as per " +
"the deployment section of \"Apache Avro Data Source Guide\".")
throw QueryCompilationErrors.failedToFindAvroDataSourceError(provider1)
} else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {
throw new AnalysisException(
s"Failed to find data source: $provider1. Please deploy the application as " +
"per the deployment section of " +
"\"Structured Streaming + Kafka Integration Guide\".")
throw QueryCompilationErrors.failedToFindKafkaDataSourceError(provider1)
} else {
throw new ClassNotFoundException(
s"Failed to find data source: $provider1. Please find packages at " +
"http://spark.apache.org/third-party-projects.html",
error)
throw QueryExecutionErrors.failedToFindDataSourceError(provider1, error)
}
}
} catch {
@ -697,8 +672,7 @@ object DataSource extends Logging {
// NoClassDefFoundError's class name uses "/" rather than "." for packages
val className = e.getMessage.replaceAll("/", ".")
if (spark2RemovedClasses.contains(className)) {
throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
"Please check if your library is compatible with Spark 2.0", e)
throw QueryExecutionErrors.removedClassInSpark2Error(className, e)
} else {
throw e
}
@ -717,8 +691,7 @@ object DataSource extends Logging {
s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).")
internalSources.head.getClass
} else {
throw new AnalysisException(s"Multiple sources found for $provider1 " +
s"(${sourceNames.mkString(", ")}), please specify the fully qualified class name.")
throw QueryCompilationErrors.findMultipleDataSourceError(provider1, sourceNames)
}
}
} catch {
@ -726,9 +699,7 @@ object DataSource extends Logging {
// NoClassDefFoundError's class name uses "/" rather than "." for packages
val className = e.getCause.getMessage.replaceAll("/", ".")
if (spark2RemovedClasses.contains(className)) {
throw new ClassNotFoundException(s"Detected an incompatible DataSourceRegister. " +
"Please remove the incompatible library from classpath or upgrade it. " +
s"Error: ${e.getMessage}", e)
throw QueryExecutionErrors.incompatibleDataSourceRegisterError(e)
} else {
throw e
}
@ -789,7 +760,7 @@ object DataSource extends Logging {
}
if (checkEmptyGlobPath && globResult.isEmpty) {
throw new AnalysisException(s"Path does not exist: $globPath")
throw QueryCompilationErrors.dataPathNotExistError(globPath.toString)
}
globResult
@ -803,7 +774,7 @@ object DataSource extends Logging {
ThreadUtils.parmap(nonGlobPaths, "checkPathsExist", numThreads) { path =>
val fs = path.getFileSystem(hadoopConf)
if (!fs.exists(path)) {
throw new AnalysisException(s"Path does not exist: $path")
throw QueryCompilationErrors.dataPathNotExistError(path.toString)
}
}
} catch {
@ -855,11 +826,7 @@ object DataSource extends Logging {
if (hasEmptySchema(schema)) {
throw new AnalysisException(
s"""
|Datasource does not support writing empty or nested empty schemas.
|Please make sure the data schema has at least one or more column(s).
""".stripMargin)
throw QueryCompilationErrors.writeEmptySchemasUnsupportedByDataSourceError()
}
}
}

View file

@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.connector.catalog.SupportsRead
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.streaming.StreamingRelation
@ -76,28 +77,19 @@ object DataSourceAnalysis extends Rule[LogicalPlan] with CastSupport {
// The sum of the number of static partition columns and columns provided in the SELECT
// clause needs to match the number of columns of the target table.
if (staticPartitions.size + sourceAttributes.size != targetAttributes.size) {
throw new AnalysisException(
s"The data to be inserted needs to have the same number of " +
s"columns as the target table: target table has ${targetAttributes.size} " +
s"column(s) but the inserted data has ${sourceAttributes.size + staticPartitions.size} " +
s"column(s), which contain ${staticPartitions.size} partition column(s) having " +
s"assigned constant values.")
throw QueryCompilationErrors.insertMismatchedColumnNumberError(
targetAttributes, sourceAttributes, staticPartitions.size)
}
if (providedPartitions.size != targetPartitionSchema.fields.size) {
throw new AnalysisException(
s"The data to be inserted needs to have the same number of " +
s"partition columns as the target table: target table " +
s"has ${targetPartitionSchema.fields.size} partition column(s) but the inserted " +
s"data has ${providedPartitions.size} partition columns specified.")
throw QueryCompilationErrors.insertMismatchedPartitionNumberError(
targetPartitionSchema, providedPartitions.size)
}
staticPartitions.foreach {
case (partKey, partValue) =>
case (partKey, _) =>
if (!targetPartitionSchema.fields.exists(field => resolver(field.name, partKey))) {
throw new AnalysisException(
s"$partKey is not a partition column. Partition columns are " +
s"${targetPartitionSchema.fields.map(_.name).mkString("[", ",", "]")}")
throw QueryCompilationErrors.invalidPartitionColumnError(partKey, targetPartitionSchema)
}
}
@ -121,9 +113,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] with CastSupport {
Some(Alias(cast(Literal(partValue), field.dataType), field.name)())
}
} else {
throw new AnalysisException(
s"Partition column ${field.name} have multiple values specified, " +
s"${potentialSpecs.mkString("[", ", ", "]")}. Please only specify a single value.")
throw QueryCompilationErrors.multiplePartitionColumnValuesSpecifiedError(
field, potentialSpecs)
}
}
@ -131,11 +122,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] with CastSupport {
// any static partition appear after dynamic partitions.
partitionList.dropWhile(_.isDefined).collectFirst {
case Some(_) =>
throw new AnalysisException(
s"The ordering of partition columns is " +
s"${targetPartitionSchema.fields.map(_.name).mkString("[", ",", "]")}. " +
"All partition columns having constant values need to appear before other " +
"partition columns that do not have an assigned constant value.")
throw QueryCompilationErrors.invalidOrderingForConstantValuePartitionColumnError(
targetPartitionSchema)
}
assert(partitionList.take(staticPartitions.size).forall(_.isDefined))
@ -199,7 +187,7 @@ object DataSourceAnalysis extends Rule[LogicalPlan] with CastSupport {
// Sanity check
if (t.location.rootPaths.size != 1) {
throw new AnalysisException("Can only write data to relations with a single path.")
throw QueryCompilationErrors.cannotWriteDataToRelationsWithMultiplePathsError()
}
val outputPath = t.location.rootPaths.head
@ -653,8 +641,7 @@ object DataSourceStrategy
expressions.Not(rebuildExpressionFromFilter(pred, translatedFilterToExpr))
case other =>
translatedFilterToExpr.getOrElse(other,
throw new AnalysisException(
s"Fail to rebuild expression: missing key $filter in `translatedFilterToExpr`"))
throw QueryCompilationErrors.failedToRebuildExpressionError(filter))
}
}

View file

@ -27,9 +27,9 @@ import org.json4s.jackson.Serialization
import org.apache.spark.SparkUpgradeException
import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_LEGACY_INT96, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.util.RebaseDateTime
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
@ -65,8 +65,7 @@ object DataSourceUtils {
def verifySchema(format: FileFormat, schema: StructType): Unit = {
schema.foreach { field =>
if (!format.supportDataType(field.dataType)) {
throw new AnalysisException(
s"$format data source does not support ${field.dataType.catalogString} data type.")
throw QueryCompilationErrors.dataTypeUnsupportedByDataSourceError(format.toString, field)
}
}
}
@ -140,16 +139,9 @@ object DataSourceUtils {
(SQLConf.PARQUET_REBASE_MODE_IN_READ.key, ParquetOptions.DATETIME_REBASE_MODE)
case "Avro" =>
(SQLConf.AVRO_REBASE_MODE_IN_READ.key, "datetimeRebaseMode")
case _ => throw new IllegalStateException("unrecognized format " + format)
case _ => throw QueryExecutionErrors.unrecognizedFileFormatError(format)
}
new SparkUpgradeException("3.0", "reading dates before 1582-10-15 or timestamps before " +
s"1900-01-01T00:00:00Z from $format files can be ambiguous, as the files may be written by " +
"Spark 2.x or legacy versions of Hive, which uses a legacy hybrid calendar that is " +
"different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in " +
s"SPARK-31404. You can set the SQL config '$config' or the datasource option '$option' to " +
"'LEGACY' to rebase the datetime values w.r.t. the calendar difference during reading. " +
s"To read the datetime values as it is, set the SQL config '$config' or " +
s"the datasource option '$option' to 'CORRECTED'.", null)
QueryExecutionErrors.sparkUpgradeInReadingDatesError(format, config, option)
}
def newRebaseExceptionInWrite(format: String): SparkUpgradeException = {
@ -157,17 +149,9 @@ object DataSourceUtils {
case "Parquet INT96" => SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key
case "Parquet" => SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key
case "Avro" => SQLConf.AVRO_REBASE_MODE_IN_WRITE.key
case _ => throw new IllegalStateException("unrecognized format " + format)
case _ => throw QueryExecutionErrors.unrecognizedFileFormatError(format)
}
new SparkUpgradeException("3.0", "writing dates before 1582-10-15 or timestamps before " +
s"1900-01-01T00:00:00Z into $format files can be dangerous, as the files may be read by " +
"Spark 2.x or legacy versions of Hive later, which uses a legacy hybrid calendar that is " +
"different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in " +
s"SPARK-31404. You can set $config to 'LEGACY' to rebase the datetime values w.r.t. " +
"the calendar difference during writing, to get maximum interoperability. Or set " +
s"$config to 'CORRECTED' to write the datetime values as it is, if you are 100% sure that " +
"the written files will only be read by Spark 3.0+ or other systems that use Proleptic " +
"Gregorian calendar.", null)
QueryExecutionErrors.sparkUpgradeInWritingDatesError(format, config)
}
def creteDateRebaseFuncInRead(
@ -226,13 +210,7 @@ object DataSourceUtils {
// Check the same key with different values
table.storage.properties.foreach { case (k, v) =>
if (extraOptions.containsKey(k) && extraOptions.get(k) != v) {
throw new AnalysisException(
s"Fail to resolve data source for the table ${table.identifier} since the table " +
s"serde property has the duplicated key $k with extra options specified for this " +
"scan operation. To fix this, you can rollback to the legacy behavior of ignoring " +
"the extra options by setting the config " +
s"${SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key} to `false`, or address the " +
s"conflicts of the same config.")
throw QueryCompilationErrors.failToResolveDataSourceForTableError(table, k)
}
}
// To keep the original key from table properties, here we filter all case insensitive

View file

@ -26,6 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{DataType, StructType}
@ -110,7 +111,7 @@ trait FileFormat {
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
throw new UnsupportedOperationException(s"buildReader is not supported for $this")
throw QueryExecutionErrors.buildReaderUnsupportedForFileFormatError(this.toString)
}
/**

View file

@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StringType
@ -229,7 +230,7 @@ object FileFormatWriter extends Logging {
} catch { case cause: Throwable =>
logError(s"Aborting job ${description.uuid}.", cause)
committer.abortJob(job)
throw new SparkException("Job aborted.", cause)
throw QueryExecutionErrors.jobAbortedError(cause)
}
}
@ -294,7 +295,7 @@ object FileFormatWriter extends Logging {
// We throw the exception and let Executor throw ExceptionFailure to abort the job.
throw new TaskOutputFileAlreadyExistException(f)
case t: Throwable =>
throw new SparkException("Task failed while writing rows.", t)
throw QueryExecutionErrors.taskFailedWhileWritingRowsError(t)
}
}

View file

@ -26,6 +26,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.NextIterator
@ -116,12 +117,7 @@ class FileScanRDD(
readFunction(currentFile)
} catch {
case e: FileNotFoundException =>
throw new FileNotFoundException(
e.getMessage + "\n" +
"It is possible the underlying files have been updated. " +
"You can explicitly invalidate the cache in Spark by " +
"running 'REFRESH TABLE tableName' command in SQL or " +
"by recreating the Dataset/DataFrame involved.")
throw QueryExecutionErrors.readCurrentFileNotFoundError(e)
}
}

View file

@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.datasources
import java.io.IOException
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.io.FileCommitProtocol
@ -29,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.internal.SQLConf
@ -119,7 +118,7 @@ case class InsertIntoHadoopFsRelationCommand(
val pathExists = fs.exists(qualifiedOutputPath)
(mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
throw QueryCompilationErrors.outputPathAlreadyExistsError(qualifiedOutputPath)
case (SaveMode.Overwrite, true) =>
if (ifPartitionNotExists && matchingPartitions.nonEmpty) {
false
@ -135,7 +134,7 @@ case class InsertIntoHadoopFsRelationCommand(
case (SaveMode.Ignore, exists) =>
!exists
case (s, exists) =>
throw new IllegalStateException(s"unsupported save mode $s ($exists)")
throw QueryExecutionErrors.unsupportedSaveModeError(s.toString, exists)
}
}
@ -234,8 +233,7 @@ case class InsertIntoHadoopFsRelationCommand(
// first clear the path determined by the static partition keys (e.g. /table/foo=1)
val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix)
if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) {
throw new IOException(s"Unable to clear output " +
s"directory $staticPrefixPath prior to writing to it")
throw QueryExecutionErrors.cannotClearOutputDirectoryError(staticPrefixPath)
}
// now clear all custom partition locations (e.g. /custom/dir/where/foo=2/bar=4)
for ((spec, customLoc) <- customPartitionLocations) {
@ -244,8 +242,7 @@ case class InsertIntoHadoopFsRelationCommand(
"Custom partition location did not match static partitioning keys")
val path = new Path(customLoc)
if (fs.exists(path) && !committer.deleteWithJob(fs, path, true)) {
throw new IOException(s"Unable to clear partition " +
s"directory $path prior to writing to it")
throw QueryExecutionErrors.cannotClearPartitionDirectoryError(path)
}
}
}

View file

@ -29,12 +29,12 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.unsafe.types.UTF8String
@ -199,8 +199,8 @@ object PartitioningUtils {
} catch {
case NonFatal(_) =>
if (validatePartitionColumns) {
throw new RuntimeException(s"Failed to cast value `${typedValue.value}` to " +
s"`${typedValue.dataType}` for partition column `$columnName`")
throw QueryExecutionErrors.failedToCastValueToDataTypeForPartitionColumnError(
typedValue.value, typedValue.dataType, columnName)
} else null
}
}
@ -527,7 +527,7 @@ object PartitioningUtils {
}.getOrElse {
Cast(Cast(Literal(value), DateType, Some(zoneId.getId)), TimestampType).eval()
}
case dt => throw new IllegalArgumentException(s"Unexpected type $dt")
case dt => throw QueryExecutionErrors.typeUnsupportedError(dt)
}
def validatePartitionColumn(
@ -541,12 +541,12 @@ object PartitioningUtils {
partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach {
field => field.dataType match {
case _: AtomicType => // OK
case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column")
case _ => throw QueryCompilationErrors.cannotUseDataTypeForPartitionColumnError(field)
}
}
if (partitionColumns.nonEmpty && partitionColumns.size == schema.fields.length) {
throw new AnalysisException(s"Cannot use all columns for partition columns")
throw QueryCompilationErrors.cannotUseAllColumnsForPartitionColumnsError()
}
}
@ -558,7 +558,7 @@ object PartitioningUtils {
StructType(partitionColumns.map { col =>
schema.find(f => equality(f.name, col)).getOrElse {
val schemaCatalog = schema.catalogString
throw new AnalysisException(s"Partition column `$col` not found in schema $schemaCatalog")
throw QueryCompilationErrors.partitionColumnNotFoundInSchemaError(col, schemaCatalog)
}
}).asNullable
}

View file

@ -21,6 +21,8 @@ import java.io.Closeable
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.spark.sql.errors.QueryExecutionErrors
/**
* An adaptor from a Hadoop [[RecordReader]] to an [[Iterator]] over the values returned.
*
@ -48,7 +50,7 @@ class RecordReaderIterator[T](
override def next(): T = {
if (!hasNext) {
throw new java.util.NoSuchElementException("End of stream")
throw QueryExecutionErrors.endOfStreamError()
}
havePair = false
rowReader.getCurrentValue

View file

@ -25,11 +25,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.internal.SQLConf.SOURCES_BINARY_FILE_MAX_LENGTH
import org.apache.spark.sql.sources.{And, DataSourceRegister, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Not, Or}
@ -69,7 +69,7 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister {
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
throw new UnsupportedOperationException("Write is not supported for binary file data source")
throw QueryExecutionErrors.writeUnsupportedForBinaryFileDataSourceError()
}
override def isSplitable(
@ -114,9 +114,7 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister {
writer.write(i, DateTimeUtils.millisToMicros(status.getModificationTime))
case (CONTENT, i) =>
if (status.getLen > maxLength) {
throw new SparkException(
s"The length of ${status.getPath} is ${status.getLen}, " +
s"which exceeds the max length allowed: ${maxLength}.")
throw QueryExecutionErrors.fileLengthExceedsMaxLengthError(status, maxLength)
}
val stream = fs.open(status.getPath)
try {
@ -125,7 +123,7 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister {
Closeables.close(stream, true)
}
case (other, _) =>
throw new RuntimeException(s"Unsupported field name: ${other}")
throw QueryExecutionErrors.unsupportedFieldNameError(other)
}
Iterator.single(writer.getRow)
} else {

View file

@ -25,6 +25,7 @@ import org.apache.commons.io.FilenameUtils
import org.apache.spark.SparkFiles
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors
/**
* Options for the JDBC data source.
@ -73,22 +74,20 @@ class JDBCOptions(
// table name or a table subquery.
val tableOrQuery = (parameters.get(JDBC_TABLE_NAME), parameters.get(JDBC_QUERY_STRING)) match {
case (Some(name), Some(subquery)) =>
throw new IllegalArgumentException(
s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified at the same time."
)
throw QueryExecutionErrors.cannotSpecifyBothJdbcTableNameAndQueryError(
JDBC_TABLE_NAME, JDBC_QUERY_STRING)
case (None, None) =>
throw new IllegalArgumentException(
s"Option '$JDBC_TABLE_NAME' or '$JDBC_QUERY_STRING' is required."
)
throw QueryExecutionErrors.missingJdbcTableNameAndQueryError(
JDBC_TABLE_NAME, JDBC_QUERY_STRING)
case (Some(name), None) =>
if (name.isEmpty) {
throw new IllegalArgumentException(s"Option '$JDBC_TABLE_NAME' can not be empty.")
throw QueryExecutionErrors.emptyOptionError(JDBC_TABLE_NAME)
} else {
name.trim
}
case (None, Some(subquery)) =>
if (subquery.isEmpty) {
throw new IllegalArgumentException(s"Option `$JDBC_QUERY_STRING` can not be empty.")
throw QueryExecutionErrors.emptyOptionError(JDBC_QUERY_STRING)
} else {
s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}"
}
@ -180,10 +179,8 @@ class JDBCOptions(
case "READ_COMMITTED" => Connection.TRANSACTION_READ_COMMITTED
case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
case other => throw new IllegalArgumentException(
s"Invalid value `$other` for parameter `$JDBC_TXN_ISOLATION_LEVEL`. This can be " +
"`NONE`, `READ_UNCOMMITTED`, `READ_COMMITTED`, `REPEATABLE_READ` or `SERIALIZABLE`."
)
case other => throw QueryExecutionErrors.invalidJdbcTxnIsolationLevelError(
JDBC_TXN_ISOLATION_LEVEL, other)
}
// An option to execute custom SQL before fetching data from the remote DB
val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT)

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.datasources.jdbc
import java.sql.{Connection, Driver, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException, SQLFeatureNotSupportedException}
import java.sql.{Connection, Driver, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException}
import java.time.{Instant, LocalDate}
import java.util.Locale
import java.util.concurrent.TimeUnit
@ -28,7 +28,7 @@ import scala.util.control.NonFatal
import org.apache.spark.TaskContext
import org.apache.spark.executor.InputMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.encoders.RowEncoder
@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
@ -141,7 +142,7 @@ object JdbcUtils extends Logging {
val tableColumnNames = tableSchema.get.fieldNames
rddSchema.fields.map { col =>
val normalizedName = tableColumnNames.find(f => columnNameEquality(f, col.name)).getOrElse {
throw new AnalysisException(s"""Column "${col.name}" not found in schema $tableSchema""")
throw QueryCompilationErrors.columnNotFoundInSchemaError(col, tableSchema)
}
dialect.quoteIdentifier(normalizedName)
}.mkString(",")
@ -177,7 +178,7 @@ object JdbcUtils extends Logging {
def getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType = {
dialect.getJDBCType(dt).orElse(getCommonJDBCType(dt)).getOrElse(
throw new IllegalArgumentException(s"Can't get JDBC type for ${dt.catalogString}"))
throw QueryExecutionErrors.cannotGetJdbcTypeError(dt))
}
/**
@ -240,12 +241,12 @@ object JdbcUtils extends Logging {
case java.sql.Types.VARBINARY => BinaryType
case java.sql.Types.VARCHAR => StringType
case _ =>
throw new SQLException("Unrecognized SQL type " + sqlType)
throw QueryExecutionErrors.unrecognizedSqlTypeError(sqlType)
// scalastyle:on
}
if (answer == null) {
throw new SQLException("Unsupported type " + JDBCType.valueOf(sqlType).getName)
throw QueryExecutionErrors.unsupportedJdbcTypeError(JDBCType.valueOf(sqlType).getName)
}
answer
}
@ -521,11 +522,10 @@ object JdbcUtils extends Logging {
}
case LongType if metadata.contains("binarylong") =>
throw new IllegalArgumentException(s"Unsupported array element " +
s"type ${dt.catalogString} based on binary")
throw QueryExecutionErrors.unsupportedArrayElementTypeBasedOnBinaryError(dt)
case ArrayType(_, _) =>
throw new IllegalArgumentException("Nested arrays unsupported")
throw QueryExecutionErrors.nestedArraysUnsupportedError()
case _ => (array: Object) => array.asInstanceOf[Array[Any]]
}
@ -536,7 +536,7 @@ object JdbcUtils extends Logging {
array => new GenericArrayData(elementConversion.apply(array.getArray)))
row.update(pos, array)
case _ => throw new IllegalArgumentException(s"Unsupported type ${dt.catalogString}")
case _ => throw QueryExecutionErrors.unsupportedJdbcTypeError(dt.catalogString)
}
private def nullSafeConvert[T](input: T, f: T => Any): Any = {
@ -626,8 +626,7 @@ object JdbcUtils extends Logging {
case _ =>
(_: PreparedStatement, _: Row, pos: Int) =>
throw new IllegalArgumentException(
s"Can't translate non-null value for field $pos")
throw QueryExecutionErrors.cannotTranslateNonNullValueForFieldError(pos)
}
/**
@ -825,9 +824,8 @@ object JdbcUtils extends Logging {
// checks if user specified column names exist in the DataFrame schema
userSchema.fieldNames.foreach { col =>
schema.find(f => nameEquality(f.name, col)).getOrElse {
throw new AnalysisException(
s"createTableColumnTypes option column $col not found in schema " +
schema.catalogString)
throw QueryCompilationErrors.createTableColumnTypesOptionColumnNotFoundInSchemaError(
col, schema)
}
}
@ -882,9 +880,8 @@ object JdbcUtils extends Logging {
val insertStmt = getInsertStatement(table, rddSchema, tableSchema, isCaseSensitive, dialect)
val repartitionedDF = options.numPartitions match {
case Some(n) if n <= 0 => throw new IllegalArgumentException(
s"Invalid value `$n` for parameter `${JDBCOptions.JDBC_NUM_PARTITIONS}` in table writing " +
"via JDBC. The minimum value is 1.")
case Some(n) if n <= 0 => throw QueryExecutionErrors.invalidJdbcNumPartitionsError(
n, JDBCOptions.JDBC_NUM_PARTITIONS)
case Some(n) if n < df.rdd.getNumPartitions => df.coalesce(n)
case _ => df
}
@ -951,8 +948,7 @@ object JdbcUtils extends Logging {
metaData.getDatabaseMajorVersion)(0))
} else {
if (!metaData.supportsTransactions) {
throw new SQLFeatureNotSupportedException("The target JDBC server does not support " +
"transaction and can only support ALTER TABLE with a single action.")
throw QueryExecutionErrors.transactionUnsupportedByJdbcServerError()
} else {
conn.setAutoCommit(false)
val statement = conn.createStatement

View file

@ -23,6 +23,7 @@ import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@ -189,7 +190,7 @@ class OrcDeserializer(
case udt: UserDefinedType[_] => newWriter(udt.sqlType, updater)
case _ =>
throw new UnsupportedOperationException(s"$dataType is not supported yet.")
throw QueryExecutionErrors.dataTypeUnsupportedYetError(dataType)
}
private def createArrayData(elementType: DataType, length: Int): ArrayData = elementType match {

View file

@ -25,8 +25,8 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
@ -146,7 +146,7 @@ private[sql] object OrcFilters extends OrcFiltersBase {
case DateType => PredicateLeaf.Type.DATE
case TimestampType => PredicateLeaf.Type.TIMESTAMP
case _: DecimalType => PredicateLeaf.Type.DECIMAL
case _ => throw new UnsupportedOperationException(s"DataType: ${dataType.catalogString}")
case _ => throw QueryExecutionErrors.unsupportedOperationForDataTypeError(dataType)
}
/**
@ -199,8 +199,8 @@ private[sql] object OrcFilters extends OrcFiltersBase {
case other =>
buildLeafSearchArgument(dataTypeMap, other, builder).getOrElse {
throw new SparkException(
"The input filter of OrcFilters.buildSearchArgument should be fully convertible.")
throw QueryExecutionErrors.inputFilterNotFullyConvertibleError(
"OrcFilters.buildSearchArgument")
}
}
}

View file

@ -24,6 +24,7 @@ import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
/**
@ -206,7 +207,7 @@ class OrcSerializer(dataSchema: StructType) {
case udt: UserDefinedType[_] => newConverter(udt.sqlType)
case _ =>
throw new UnsupportedOperationException(s"$dataType is not supported yet.")
throw QueryExecutionErrors.dataTypeUnsupportedYetError(dataType)
}
/**

View file

@ -26,13 +26,14 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.orc.{OrcConf, OrcFile, Reader, TypeDescription, Writer}
import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
import org.apache.spark.SPARK_VERSION_SHORT
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession}
import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.SchemaMergeUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.{ThreadUtils, Utils}
@ -76,7 +77,7 @@ object OrcUtils extends Logging {
logWarning(s"Skipped the footer in the corrupted file: $file", e)
None
} else {
throw new SparkException(s"Could not read footer for file: $file", e)
throw QueryExecutionErrors.cannotReadFooterForFileError(file, e)
}
}
}
@ -189,8 +190,8 @@ object OrcUtils extends Logging {
// Need to fail if there is ambiguity, i.e. more than one field is matched.
val matchedOrcFieldsString = matchedOrcFields.mkString("[", ", ", "]")
reader.close()
throw new RuntimeException(s"""Found duplicate field(s) "$requiredFieldName": """
+ s"$matchedOrcFieldsString in case-insensitive mode")
throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
requiredFieldName, matchedOrcFieldsString)
} else {
idx
}

View file

@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources.parquet
import java.io.IOException
import java.net.URI
import scala.collection.JavaConverters._
@ -37,7 +36,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel
import org.apache.parquet.hadoop.codec.CodecConfig
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
@ -45,6 +44,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
import org.apache.spark.sql.internal.SQLConf
@ -430,7 +430,7 @@ object ParquetFileFormat extends Logging {
finalSchemas.reduceOption { (left, right) =>
try left.merge(right) catch { case e: Throwable =>
throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
throw QueryExecutionErrors.failedToMergeIncompatibleSchemasError(left, right, e)
}
}
}
@ -457,7 +457,7 @@ object ParquetFileFormat extends Logging {
logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)
None
} else {
throw new IOException(s"Could not read footer for file: $currentFile", e)
throw QueryExecutionErrors.cannotReadFooterForFileError(currentFile, e)
}
}
}.flatten

View file

@ -31,6 +31,7 @@ import org.apache.parquet.schema.Type.Repetition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types._
@ -342,8 +343,8 @@ object ParquetReadSupport {
if (parquetTypes.size > 1) {
// Need to fail if there is ambiguity, i.e. more than one field is matched
val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
throw new RuntimeException(s"""Found duplicate field(s) "${f.name}": """ +
s"$parquetTypesString in case-insensitive mode")
throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
f.name, parquetTypesString)
} else {
clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
}

View file

@ -26,6 +26,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
import org.apache.parquet.schema.Type.Repetition._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@ -98,13 +99,13 @@ class ParquetToSparkSchemaConverter(
if (originalType == null) s"$typeName" else s"$typeName ($originalType)"
def typeNotSupported() =
throw new AnalysisException(s"Parquet type not supported: $typeString")
throw QueryCompilationErrors.parquetTypeUnsupportedError(typeString)
def typeNotImplemented() =
throw new AnalysisException(s"Parquet type not yet supported: $typeString")
throw QueryCompilationErrors.parquetTypeUnsupportedYetError(typeString)
def illegalType() =
throw new AnalysisException(s"Illegal Parquet type: $typeString")
throw QueryCompilationErrors.illegalParquetTypeError(typeString)
// When maxPrecision = -1, we skip precision range check, and always respect the precision
// specified in field.getDecimalMetadata. This is useful when interpreting decimal types stored
@ -233,7 +234,7 @@ class ParquetToSparkSchemaConverter(
valueContainsNull = valueOptional)
case _ =>
throw new AnalysisException(s"Unrecognized Parquet type: $field")
throw QueryCompilationErrors.unrecognizedParquetTypeError(field.toString)
}
}
@ -550,7 +551,7 @@ class SparkToParquetSchemaConverter(
convertField(field.copy(dataType = udt.sqlType))
case _ =>
throw new AnalysisException(s"Unsupported data type ${field.dataType.catalogString}")
throw QueryCompilationErrors.cannotConvertDataTypeToParquetTypeError(field)
}
}
}

View file

@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
import org.apache.spark.sql.connector.expressions.{FieldReference, RewritableTransform}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.sources.InsertableRelation
@ -56,8 +57,8 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] {
val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
if (!isFileFormat ||
dataSource.className.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Unsupported data source type for direct query on files: " +
s"${dataSource.className}")
throw QueryCompilationErrors.unsupportedDataSourceTypeForDirectQueryOnFilesError(
dataSource.className)
}
LogicalRelation(dataSource.resolveRelation())
} catch {
@ -107,7 +108,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
val existingTable = catalog.getTableMetadata(tableIdentWithDB)
if (existingTable.tableType == CatalogTableType.VIEW) {
throw new AnalysisException("Saving data into a view is not allowed.")
throw QueryCompilationErrors.saveDataIntoViewNotAllowedError()
}
// Check if the specified data source match the data source of the existing table.
@ -118,24 +119,19 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
// If the one of the provider is [[FileDataSourceV2]] and the other one is its corresponding
// [[FileFormat]], the two providers are considered compatible.
if (fallBackV2ToV1(existingProvider) != fallBackV2ToV1(specifiedProvider)) {
throw new AnalysisException(s"The format of the existing table $tableName is " +
s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " +
s"`${specifiedProvider.getSimpleName}`.")
throw QueryCompilationErrors.mismatchedTableFormatError(
tableName, existingProvider, specifiedProvider)
}
tableDesc.storage.locationUri match {
case Some(location) if location.getPath != existingTable.location.getPath =>
throw new AnalysisException(
s"The location of the existing table ${tableIdentWithDB.quotedString} is " +
s"`${existingTable.location}`. It doesn't match the specified location " +
s"`${tableDesc.location}`.")
throw QueryCompilationErrors.mismatchedTableLocationError(
tableIdentWithDB, existingTable, tableDesc)
case _ =>
}
if (query.schema.length != existingTable.schema.length) {
throw new AnalysisException(
s"The column number of the existing table $tableName" +
s"(${existingTable.schema.catalogString}) doesn't match the data schema" +
s"(${query.schema.catalogString})")
throw QueryCompilationErrors.mismatchedTableColumnNumberError(
tableName, existingTable, query)
}
val resolver = conf.resolver
@ -147,8 +143,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
val adjustedColumns = tableCols.map { col =>
query.resolve(Seq(col), resolver).getOrElse {
val inputColumns = query.schema.map(_.name).mkString(", ")
throw new AnalysisException(
s"cannot resolve '$col' given input columns: [$inputColumns]")
throw QueryCompilationErrors.cannotResolveColumnGivenInputColumnsError(col, inputColumns)
}
}
@ -157,12 +152,8 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
tableName, tableCols, tableDesc.partitionColumnNames, resolver)
if (specifiedPartCols != existingTable.partitionColumnNames) {
val existingPartCols = existingTable.partitionColumnNames.mkString(", ")
throw new AnalysisException(
s"""
|Specified partitioning does not match that of the existing table $tableName.
|Specified partition columns: [${specifiedPartCols.mkString(", ")}]
|Existing partition columns: [$existingPartCols]
""".stripMargin)
throw QueryCompilationErrors.mismatchedTablePartitionColumnError(
tableName, specifiedPartCols, existingPartCols)
}
// Check if the specified bucketing match the existing table.
@ -174,12 +165,8 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
specifiedBucketSpec.map(_.toString).getOrElse("not bucketed")
val existingBucketString =
existingTable.bucketSpec.map(_.toString).getOrElse("not bucketed")
throw new AnalysisException(
s"""
|Specified bucketing does not match that of the existing table $tableName.
|Specified bucketing: $specifiedBucketString
|Existing bucketing: $existingBucketString
""".stripMargin)
throw QueryCompilationErrors.mismatchedTableBucketingError(
tableName, specifiedBucketString, existingBucketString)
}
val newQuery = if (adjustedColumns != query.output) {
@ -257,8 +244,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
if (schema.isEmpty) {
if (partitioning.nonEmpty) {
throw new AnalysisException("It is not allowed to specify partitioning when the " +
"table schema is not defined.")
throw QueryCompilationErrors.specifyPartitionNotAllowedWhenTableSchemaNotDefinedError()
}
create
@ -298,12 +284,12 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
normalizedBucketSpec.foreach { spec =>
for (bucketCol <- spec.bucketColumnNames if normalizedPartCols.contains(bucketCol)) {
throw new AnalysisException(s"bucketing column '$bucketCol' should not be part of " +
s"partition columns '${normalizedPartCols.mkString(", ")}'")
throw QueryCompilationErrors.bucketingColumnCannotBePartOfPartitionColumnsError(
bucketCol, normalizedPartCols)
}
for (sortCol <- spec.sortColumnNames if normalizedPartCols.contains(sortCol)) {
throw new AnalysisException(s"bucket sorting column '$sortCol' should not be part of " +
s"partition columns '${normalizedPartCols.mkString(", ")}'")
throw QueryCompilationErrors.bucketSortingColumnCannotBePartOfPartitionColumnsError(
sortCol, normalizedPartCols)
}
}
@ -394,11 +380,8 @@ object PreprocessTableInsertion extends Rule[LogicalPlan] {
val expectedColumns = insert.table.output.filterNot(a => staticPartCols.contains(a.name))
if (expectedColumns.length != insert.query.schema.length) {
throw new AnalysisException(
s"$tblName requires that the data to be inserted have the same number of columns as the " +
s"target table: target table has ${insert.table.output.size} column(s) but the " +
s"inserted data has ${insert.query.output.length + staticPartCols.size} column(s), " +
s"including ${staticPartCols.size} partition column(s) having constant value(s).")
throw QueryCompilationErrors.mismatchedInsertedDataColumnNumberError(
tblName, insert, staticPartCols)
}
val partitionsTrackedByCatalog = catalogTable.isDefined &&
@ -408,8 +391,8 @@ object PreprocessTableInsertion extends Rule[LogicalPlan] {
// empty partition column value
if (normalizedPartSpec.values.flatten.exists(v => v != null && v.isEmpty)) {
val spec = normalizedPartSpec.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
throw new AnalysisException(
s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
throw QueryCompilationErrors.invalidPartitionSpecError(
s"The spec ($spec) contains an empty partition column value")
}
}
@ -417,12 +400,8 @@ object PreprocessTableInsertion extends Rule[LogicalPlan] {
tblName, expectedColumns, insert.query, byName = false, conf)
if (normalizedPartSpec.nonEmpty) {
if (normalizedPartSpec.size != partColNames.length) {
throw new AnalysisException(
s"""
|Requested partitioning does not match the table $tblName:
|Requested partitions: ${normalizedPartSpec.keys.mkString(",")}
|Table partitions: ${partColNames.mkString(",")}
""".stripMargin)
throw QueryCompilationErrors.requestedPartitionsMismatchTablePartitionsError(
tblName, normalizedPartSpec, partColNames)
}
insert.copy(query = newQuery, partitionSpec = normalizedPartSpec)
@ -458,10 +437,11 @@ object HiveOnlyCheck extends (LogicalPlan => Unit) {
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case CreateTable(tableDesc, _, _) if DDLUtils.isHiveTable(tableDesc) =>
throw new AnalysisException("Hive support is required to CREATE Hive TABLE (AS SELECT)")
throw QueryCompilationErrors.ddlWithoutHiveSupportEnabledError(
"CREATE Hive TABLE (AS SELECT)")
case i: InsertIntoDir if DDLUtils.isHiveTable(i.provider) =>
throw new AnalysisException(
"Hive support is required to INSERT OVERWRITE DIRECTORY with the Hive format")
throw QueryCompilationErrors.ddlWithoutHiveSupportEnabledError(
"INSERT OVERWRITE DIRECTORY with the Hive format")
case _ => // OK
}
}