[SPARK-33542][SQL] Group exception messages in catalyst/catalog

### What changes were proposed in this pull request?
This PR group exception messages in `/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog`.

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #30870 from beliefer/SPARK-33542.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Co-authored-by: beliefer <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
gengjiaan 2021-01-05 16:15:33 +00:00 committed by Wenchen Fan
parent 14c2edae7e
commit cc1d9d25fb
7 changed files with 295 additions and 105 deletions

View file

@ -17,8 +17,10 @@
package org.apache.spark.sql.errors
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedView}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction, WindowSpecDefinition}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SerdeInfo}
@ -364,10 +366,16 @@ object QueryCompilationErrors {
new AnalysisException(s"undefined function $name")
}
def invalidFunctionArgumentsError(
name: String, expectedInfo: String, actualNumber: Int): Throwable = {
new AnalysisException(s"Invalid number of arguments for function $name. " +
s"Expected: $expectedInfo; Found: $actualNumber")
}
def invalidFunctionArgumentNumberError(
validParametersCount: Seq[Int], name: String, params: Seq[Class[Expression]]): Throwable = {
val invalidArgumentsMsg = if (validParametersCount.length == 0) {
s"Invalid arguments for function $name"
if (validParametersCount.length == 0) {
new AnalysisException(s"Invalid arguments for function $name")
} else {
val expectedNumberOfParameters = if (validParametersCount.length == 1) {
validParametersCount.head.toString
@ -375,10 +383,8 @@ object QueryCompilationErrors {
validParametersCount.init.mkString("one of ", ", ", " and ") +
validParametersCount.last
}
s"Invalid number of arguments for function $name. " +
s"Expected: $expectedNumberOfParameters; Found: ${params.length}"
invalidFunctionArgumentsError(name, expectedNumberOfParameters, params.length)
}
new AnalysisException(invalidArgumentsMsg)
}
def functionAcceptsOnlyOneArgumentError(name: String): Throwable = {
@ -504,4 +510,156 @@ object QueryCompilationErrors {
def columnDoesNotExistError(colName: String): Throwable = {
new AnalysisException(s"Column $colName does not exist")
}
def renameTempViewToExistingViewError(oldName: String, newName: String): Throwable = {
new AnalysisException(
s"rename temporary view from '$oldName' to '$newName': destination view already exists")
}
def databaseNotEmptyError(db: String, details: String): Throwable = {
new AnalysisException(s"Database $db is not empty. One or more $details exist.")
}
def invalidNameForTableOrDatabaseError(name: String): Throwable = {
new AnalysisException(s"`$name` is not a valid name for tables/databases. " +
"Valid names only contain alphabet characters, numbers and _.")
}
def cannotCreateDatabaseWithSameNameAsPreservedDatabaseError(database: String): Throwable = {
new AnalysisException(s"$database is a system preserved database, " +
"you cannot create a database with this name.")
}
def cannotDropDefaultDatabaseError(): Throwable = {
new AnalysisException("Can not drop default database")
}
def cannotUsePreservedDatabaseAsCurrentDatabaseError(database: String): Throwable = {
new AnalysisException(s"$database is a system preserved database, you cannot use it as " +
"current database. To access global temporary views, you should use qualified name with " +
s"the GLOBAL_TEMP_DATABASE, e.g. SELECT * FROM $database.viewName.")
}
def createExternalTableWithoutLocationError(): Throwable = {
new AnalysisException("CREATE EXTERNAL TABLE must be accompanied by LOCATION")
}
def cannotOperateManagedTableWithExistingLocationError(
methodName: String, tableIdentifier: TableIdentifier, tableLocation: Path): Throwable = {
new AnalysisException(s"Can not $methodName the managed table('$tableIdentifier')" +
s". The associated location('${tableLocation.toString}') already exists.")
}
def dropNonExistentColumnsNotSupportedError(
nonExistentColumnNames: Seq[String]): Throwable = {
new AnalysisException(
s"""
|Some existing schema fields (${nonExistentColumnNames.mkString("[", ",", "]")}) are
|not present in the new schema. We don't support dropping columns yet.
""".stripMargin)
}
def cannotRetrieveTableOrViewNotInSameDatabaseError(
qualifiedTableNames: Seq[QualifiedTableName]): Throwable = {
new AnalysisException("Only the tables/views belong to the same database can be retrieved. " +
s"Querying tables/views are $qualifiedTableNames")
}
def renameTableSourceAndDestinationMismatchError(db: String, newDb: String): Throwable = {
new AnalysisException(
s"RENAME TABLE source and destination databases do not match: '$db' != '$newDb'")
}
def cannotRenameTempViewWithDatabaseSpecifiedError(
oldName: TableIdentifier, newName: TableIdentifier): Throwable = {
new AnalysisException(s"RENAME TEMPORARY VIEW from '$oldName' to '$newName': cannot " +
s"specify database name '${newName.database.get}' in the destination table")
}
def cannotRenameTempViewToExistingTableError(
oldName: TableIdentifier, newName: TableIdentifier): Throwable = {
new AnalysisException(s"RENAME TEMPORARY VIEW from '$oldName' to '$newName': " +
"destination table already exists")
}
def invalidPartitionSpecError(details: String): Throwable = {
new AnalysisException(s"Partition spec is invalid. $details")
}
def functionAlreadyExistsError(func: FunctionIdentifier): Throwable = {
new AnalysisException(s"Function $func already exists")
}
def cannotLoadClassWhenRegisteringFunctionError(
className: String, func: FunctionIdentifier): Throwable = {
new AnalysisException(s"Can not load class '$className' when registering " +
s"the function '$func', please make sure it is on the classpath")
}
def v2CatalogNotSupportFunctionError(
catalog: String, namespace: Seq[String]): Throwable = {
new AnalysisException("V2 catalog does not support functions yet. " +
s"catalog: $catalog, namespace: '${namespace.quoted}'")
}
def resourceTypeNotSupportedError(resourceType: String): Throwable = {
new AnalysisException(s"Resource Type '$resourceType' is not supported.")
}
def tableNotSpecifyDatabaseError(identifier: TableIdentifier): Throwable = {
new AnalysisException(s"table $identifier did not specify database")
}
def tableNotSpecifyLocationUriError(identifier: TableIdentifier): Throwable = {
new AnalysisException(s"table $identifier did not specify locationUri")
}
def partitionNotSpecifyLocationUriError(specString: String): Throwable = {
new AnalysisException(s"Partition [$specString] did not specify locationUri")
}
def invalidBucketNumberError(bucketingMaxBuckets: Int, numBuckets: Int): Throwable = {
new AnalysisException(
s"Number of buckets should be greater than 0 but less than or equal to " +
s"bucketing.maxBuckets (`$bucketingMaxBuckets`). Got `$numBuckets`")
}
def corruptedTableNameContextInCatalogError(numParts: Int, index: Int): Throwable = {
new AnalysisException("Corrupted table name context in catalog: " +
s"$numParts parts expected, but part $index is missing.")
}
def corruptedViewSQLConfigsInCatalogError(e: Exception): Throwable = {
new AnalysisException("Corrupted view SQL configs in catalog", cause = Some(e))
}
def corruptedViewQueryOutputColumnsInCatalogError(numCols: String, index: Int): Throwable = {
new AnalysisException("Corrupted view query output column names in catalog: " +
s"$numCols parts expected, but part $index is missing.")
}
def corruptedViewReferredTempViewInCatalogError(e: Exception): Throwable = {
new AnalysisException("corrupted view referred temp view names in catalog", cause = Some(e))
}
def corruptedViewReferredTempFunctionsInCatalogError(e: Exception): Throwable = {
new AnalysisException(
"corrupted view referred temp functions names in catalog", cause = Some(e))
}
def columnStatisticsDeserializationNotSupportedError(
name: String, dataType: DataType): Throwable = {
new AnalysisException("Column statistics deserialization is not supported for " +
s"column $name of data type: $dataType.")
}
def columnStatisticsSerializationNotSupportedError(
colName: String, dataType: DataType): Throwable = {
new AnalysisException("Column statistics serialization is not supported for " +
s"column $colName of data type: $dataType.")
}
def cannotReadCorruptedTablePropertyError(key: String, details: String = ""): Throwable = {
new AnalysisException(s"Cannot read table property '$key' as it's corrupted.$details")
}
}

View file

@ -17,7 +17,13 @@
package org.apache.spark.sql.errors
import java.io.IOException
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
/**
@ -56,4 +62,54 @@ object QueryExecutionErrors {
def cannotTerminateGeneratorError(generator: UnresolvedGenerator): Throwable = {
new UnsupportedOperationException(s"Cannot terminate expression: $generator")
}
def unableToCreateDatabaseAsFailedToCreateDirectoryError(
dbDefinition: CatalogDatabase, e: IOException): Throwable = {
new SparkException(s"Unable to create database ${dbDefinition.name} as failed " +
s"to create its directory ${dbDefinition.locationUri}", e)
}
def unableToDropDatabaseAsFailedToDeleteDirectoryError(
dbDefinition: CatalogDatabase, e: IOException): Throwable = {
new SparkException(s"Unable to drop database ${dbDefinition.name} as failed " +
s"to delete its directory ${dbDefinition.locationUri}", e)
}
def unableToCreateTableAsFailedToCreateDirectoryError(
table: String, defaultTableLocation: Path, e: IOException): Throwable = {
new SparkException(s"Unable to create table $table as failed " +
s"to create its directory $defaultTableLocation", e)
}
def unableToDeletePartitionPathError(partitionPath: Path, e: IOException): Throwable = {
new SparkException(s"Unable to delete partition path $partitionPath", e)
}
def unableToDropTableAsFailedToDeleteDirectoryError(
table: String, dir: Path, e: IOException): Throwable = {
new SparkException(s"Unable to drop table $table as failed " +
s"to delete its directory $dir", e)
}
def unableToRenameTableAsFailedToRenameDirectoryError(
oldName: String, newName: String, oldDir: Path, e: IOException): Throwable = {
new SparkException(s"Unable to rename table $oldName to $newName as failed " +
s"to rename its directory $oldDir", e)
}
def unableToCreatePartitionPathError(partitionPath: Path, e: IOException): Throwable = {
new SparkException(s"Unable to create partition path $partitionPath", e)
}
def unableToRenamePartitionPathError(oldPartPath: Path, e: IOException): Throwable = {
new SparkException(s"Unable to rename partition path $oldPartPath", e)
}
def methodNotImplementedError(methodName: String): Throwable = {
new UnsupportedOperationException(s"$methodName is not implemented")
}
def tableStatsNotSpecifiedError(): Throwable = {
new IllegalStateException("table stats must be specified.")
}
}

View file

@ -21,10 +21,10 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
/**
@ -92,8 +92,7 @@ class GlobalTempViewManager(val database: String) {
def rename(oldName: String, newName: String): Boolean = synchronized {
if (viewDefinitions.contains(oldName)) {
if (viewDefinitions.contains(newName)) {
throw new AnalysisException(
s"rename temporary view from '$oldName' to '$newName': destination view already exists")
throw QueryCompilationErrors.renameTempViewToExistingViewError(oldName, newName)
}
val viewDefinition = viewDefinitions(oldName)

View file

@ -24,13 +24,13 @@ import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.types.StructType
/**
@ -112,8 +112,8 @@ class InMemoryCatalog(
fs.mkdirs(location)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to create database ${dbDefinition.name} as failed " +
s"to create its directory ${dbDefinition.locationUri}", e)
throw QueryExecutionErrors.unableToCreateDatabaseAsFailedToCreateDirectoryError(
dbDefinition, e)
}
catalog.put(dbDefinition.name, new DatabaseDesc(dbDefinition))
}
@ -127,10 +127,10 @@ class InMemoryCatalog(
if (!cascade) {
// If cascade is false, make sure the database is empty.
if (catalog(db).tables.nonEmpty) {
throw new AnalysisException(s"Database $db is not empty. One or more tables exist.")
throw QueryCompilationErrors.databaseNotEmptyError(db, "tables")
}
if (catalog(db).functions.nonEmpty) {
throw new AnalysisException(s"Database '$db' is not empty. One or more functions exist.")
throw QueryCompilationErrors.databaseNotEmptyError(db, "functions")
}
}
// Remove the database.
@ -141,8 +141,8 @@ class InMemoryCatalog(
fs.delete(location, true)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to drop database ${dbDefinition.name} as failed " +
s"to delete its directory ${dbDefinition.locationUri}", e)
throw QueryExecutionErrors.unableToDropDatabaseAsFailedToDeleteDirectoryError(
dbDefinition, e)
}
catalog.remove(db)
} else {
@ -209,8 +209,8 @@ class InMemoryCatalog(
fs.mkdirs(defaultTableLocation)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to create table $table as failed " +
s"to create its directory $defaultTableLocation", e)
throw QueryExecutionErrors.unableToCreateTableAsFailedToCreateDirectoryError(
table, defaultTableLocation, e)
}
tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri))
} else {
@ -239,7 +239,7 @@ class InMemoryCatalog(
fs.delete(partitionPath, true)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to delete partition path $partitionPath", e)
throw QueryExecutionErrors.unableToDeletePartitionPathError(partitionPath, e)
}
}
assert(tableMeta.storage.locationUri.isDefined,
@ -252,8 +252,8 @@ class InMemoryCatalog(
fs.delete(dir, true)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to drop table $table as failed " +
s"to delete its directory $dir", e)
throw QueryExecutionErrors.unableToDropTableAsFailedToDeleteDirectoryError(
table, dir, e)
}
}
catalog(db).tables.remove(table)
@ -284,8 +284,8 @@ class InMemoryCatalog(
fs.rename(oldDir, newDir)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to rename table $oldName to $newName as failed " +
s"to rename its directory $oldDir", e)
throw QueryExecutionErrors.unableToRenameTableAsFailedToRenameDirectoryError(
oldName, newName, oldDir, e)
}
oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri))
}
@ -358,7 +358,7 @@ class InMemoryCatalog(
loadPath: String,
isOverwrite: Boolean,
isSrcLocal: Boolean): Unit = {
throw new UnsupportedOperationException("loadTable is not implemented")
throw QueryExecutionErrors.methodNotImplementedError("loadTable")
}
override def loadPartition(
@ -369,7 +369,7 @@ class InMemoryCatalog(
isOverwrite: Boolean,
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit = {
throw new UnsupportedOperationException("loadPartition is not implemented.")
throw QueryExecutionErrors.methodNotImplementedError("loadPartition")
}
override def loadDynamicPartitions(
@ -379,7 +379,7 @@ class InMemoryCatalog(
partition: TablePartitionSpec,
replace: Boolean,
numDP: Int): Unit = {
throw new UnsupportedOperationException("loadDynamicPartitions is not implemented.")
throw QueryExecutionErrors.methodNotImplementedError("loadDynamicPartitions")
}
// --------------------------------------------------------------------------
@ -416,7 +416,7 @@ class InMemoryCatalog(
}
} catch {
case e: IOException =>
throw new SparkException(s"Unable to create partition path $partitionPath", e)
throw QueryExecutionErrors.unableToCreatePartitionPathError(partitionPath, e)
}
existingParts.put(
@ -457,7 +457,7 @@ class InMemoryCatalog(
fs.delete(partitionPath, true)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to delete partition path $partitionPath", e)
throw QueryExecutionErrors.unableToDeletePartitionPathError(partitionPath, e)
}
}
existingParts.remove(p)
@ -490,7 +490,7 @@ class InMemoryCatalog(
fs.rename(oldPartPath, newPartPath)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to rename partition path $oldPartPath", e)
throw QueryExecutionErrors.unableToRenamePartitionPathError(oldPartPath, e)
}
oldPartition.copy(
spec = newSpec,

View file

@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
import org.apache.spark.sql.types.StructType
@ -120,8 +121,7 @@ class SessionCatalog(
*/
private def validateName(name: String): Unit = {
if (!validNameFormat.pattern.matcher(name).matches()) {
throw new AnalysisException(s"`$name` is not a valid name for tables/databases. " +
"Valid names only contain alphabet characters, numbers and _.")
throw QueryCompilationErrors.invalidNameForTableOrDatabaseError(name)
}
}
@ -216,9 +216,8 @@ class SessionCatalog(
def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
val dbName = formatDatabaseName(dbDefinition.name)
if (dbName == globalTempViewManager.database) {
throw new AnalysisException(
s"${globalTempViewManager.database} is a system preserved database, " +
"you cannot create a database with this name.")
throw QueryCompilationErrors.cannotCreateDatabaseWithSameNameAsPreservedDatabaseError(
globalTempViewManager.database)
}
validateName(dbName)
externalCatalog.createDatabase(
@ -238,7 +237,7 @@ class SessionCatalog(
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
val dbName = formatDatabaseName(db)
if (dbName == DEFAULT_DATABASE) {
throw new AnalysisException(s"Can not drop default database")
throw QueryCompilationErrors.cannotDropDefaultDatabaseError
}
if (cascade && databaseExists(dbName)) {
listTables(dbName).foreach { t =>
@ -279,11 +278,8 @@ class SessionCatalog(
def setCurrentDatabase(db: String): Unit = {
val dbName = formatDatabaseName(db)
if (dbName == globalTempViewManager.database) {
throw new AnalysisException(
s"${globalTempViewManager.database} is a system preserved database, " +
"you cannot use it as current database. To access global temporary views, you should " +
"use qualified name with the GLOBAL_TEMP_DATABASE, e.g. SELECT * FROM " +
s"${globalTempViewManager.database}.viewName.")
throw QueryCompilationErrors.cannotUsePreservedDatabaseAsCurrentDatabaseError(
globalTempViewManager.database)
}
requireDbExists(dbName)
synchronized { currentDb = dbName }
@ -320,7 +316,7 @@ class SessionCatalog(
validateLocation: Boolean = true): Unit = {
val isExternal = tableDefinition.tableType == CatalogTableType.EXTERNAL
if (isExternal && tableDefinition.storage.locationUri.isEmpty) {
throw new AnalysisException(s"CREATE EXTERNAL TABLE must be accompanied by LOCATION")
throw QueryCompilationErrors.createExternalTableWithoutLocationError
}
val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
@ -359,8 +355,8 @@ class SessionCatalog(
val fs = tableLocation.getFileSystem(hadoopConf)
if (fs.exists(tableLocation) && fs.listStatus(tableLocation).nonEmpty) {
throw new AnalysisException(s"Can not create the managed table('${table.identifier}')" +
s". The associated location('${tableLocation.toString}') already exists.")
throw QueryCompilationErrors.cannotOperateManagedTableWithExistingLocationError(
"create", table.identifier, tableLocation)
}
}
}
@ -428,11 +424,7 @@ class SessionCatalog(
val nonExistentColumnNames =
oldDataSchema.map(_.name).filterNot(columnNameResolved(newDataSchema, _))
if (nonExistentColumnNames.nonEmpty) {
throw new AnalysisException(
s"""
|Some existing schema fields (${nonExistentColumnNames.mkString("[", ",", "]")}) are
|not present in the new schema. We don't support dropping columns yet.
""".stripMargin)
throw QueryCompilationErrors.dropNonExistentColumnsNotSupportedError(nonExistentColumnNames)
}
externalCatalog.alterTableDataSchema(db, table, newDataSchema)
@ -508,10 +500,8 @@ class SessionCatalog(
if (dbs.distinct.size != 1) {
val tables = names.map(name => formatTableName(name.table))
val qualifiedTableNames = dbs.zip(tables).map { case (d, t) => QualifiedTableName(d, t)}
throw new AnalysisException(
s"Only the tables/views belong to the same database can be retrieved. Querying " +
s"tables/views are $qualifiedTableNames"
)
throw QueryCompilationErrors.cannotRetrieveTableOrViewNotInSameDatabaseError(
qualifiedTableNames)
}
val db = formatDatabaseName(dbs.head)
requireDbExists(db)
@ -722,8 +712,7 @@ class SessionCatalog(
val db = formatDatabaseName(oldName.database.getOrElse(currentDb))
newName.database.map(formatDatabaseName).foreach { newDb =>
if (db != newDb) {
throw new AnalysisException(
s"RENAME TABLE source and destination databases do not match: '$db' != '$newDb'")
throw QueryCompilationErrors.renameTableSourceAndDestinationMismatchError(db, newDb)
}
}
@ -741,13 +730,12 @@ class SessionCatalog(
externalCatalog.renameTable(db, oldTableName, newTableName)
} else {
if (newName.database.isDefined) {
throw new AnalysisException(
s"RENAME TEMPORARY VIEW from '$oldName' to '$newName': cannot specify database " +
s"name '${newName.database.get}' in the destination table")
throw QueryCompilationErrors.cannotRenameTempViewWithDatabaseSpecifiedError(
oldName, newName)
}
if (tempViews.contains(newTableName)) {
throw new AnalysisException(s"RENAME TEMPORARY VIEW from '$oldName' to '$newName': " +
"destination table already exists")
throw QueryCompilationErrors.cannotRenameTempViewToExistingTableError(
oldName, newName)
}
val table = tempViews(oldTableName)
tempViews.remove(oldTableName)
@ -1192,8 +1180,8 @@ class SessionCatalog(
specs.foreach { s =>
if (s.values.exists(_.isEmpty)) {
val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
throw new AnalysisException(
s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
throw QueryCompilationErrors.invalidPartitionSpecError(
s"The spec ($spec) contains an empty partition column value")
}
}
}
@ -1223,10 +1211,10 @@ class SessionCatalog(
val defined = table.partitionColumnNames
specs.foreach { s =>
if (!s.keys.forall(defined.contains)) {
throw new AnalysisException(
s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must be contained " +
s"within the partition spec (${table.partitionColumnNames.mkString(", ")}) defined " +
s"in table '${table.identifier}'")
throw QueryCompilationErrors.invalidPartitionSpecError(
s"The spec (${s.keys.mkString(", ")}) must be contained " +
s"within the partition spec (${table.partitionColumnNames.mkString(", ")}) defined " +
s"in table '${table.identifier}'")
}
}
}
@ -1382,8 +1370,8 @@ class SessionCatalog(
// Check input argument size
if (e.inputTypes.size != input.size) {
throw new AnalysisException(s"Invalid number of arguments for function $name. " +
s"Expected: ${e.inputTypes.size}; Found: ${input.size}")
throw QueryCompilationErrors.invalidFunctionArgumentsError(
name, e.inputTypes.size.toString, input.size)
}
e
} else {
@ -1409,15 +1397,14 @@ class SessionCatalog(
functionBuilder: Option[FunctionBuilder] = None): Unit = {
val func = funcDefinition.identifier
if (functionRegistry.functionExists(func) && !overrideIfExists) {
throw new AnalysisException(s"Function $func already exists")
throw QueryCompilationErrors.functionAlreadyExistsError(func)
}
val info = new ExpressionInfo(funcDefinition.className, func.database.orNull, func.funcName)
val builder =
functionBuilder.getOrElse {
val className = funcDefinition.className
if (!Utils.classIsLoadable(className)) {
throw new AnalysisException(s"Can not load class '$className' when registering " +
s"the function '$func', please make sure it is on the classpath")
throw QueryCompilationErrors.cannotLoadClassWhenRegisteringFunctionError(className, func)
}
makeFunctionBuilder(func.unquotedString, className)
}
@ -1522,7 +1509,6 @@ class SessionCatalog(
def lookupFunction(
name: FunctionIdentifier,
children: Seq[Expression]): Expression = synchronized {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
// Note: the implementation of this function is a little bit convoluted.
// We probably shouldn't use a single FunctionRegistry to register all three kinds of functions
// (built-in, temp, and external).
@ -1545,9 +1531,7 @@ class SessionCatalog(
case Seq() => getCurrentDatabase
case Seq(_, db) => db
case Seq(catalog, namespace @ _*) =>
throw new AnalysisException(
s"V2 catalog does not support functions yet. " +
s"catalog: ${catalog}, namespace: '${namespace.quoted}'")
throw QueryCompilationErrors.v2CatalogNotSupportFunctionError(catalog, namespace)
}
// If the name itself is not qualified, add the current database to it.
@ -1685,8 +1669,8 @@ class SessionCatalog(
val newTableLocation = new Path(new Path(databaseLocation), formatTableName(newName.table))
val fs = newTableLocation.getFileSystem(hadoopConf)
if (fs.exists(newTableLocation)) {
throw new AnalysisException(s"Can not rename the managed table('$oldName')" +
s". The associated location('$newTableLocation') already exists.")
throw QueryCompilationErrors.cannotOperateManagedTableWithExistingLocationError(
"rename", oldName, newTableLocation)
}
}
}

View file

@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog
import java.util.Locale
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.errors.QueryCompilationErrors
/** A trait that represents the type of a resourced needed by a function. */
abstract class FunctionResourceType(val resourceType: String)
@ -40,7 +40,7 @@ object FunctionResourceType {
case "file" => FileResource
case "archive" => ArchiveResource
case other =>
throw new AnalysisException(s"Resource Type '$resourceType' is not supported.")
throw QueryCompilationErrors.resourceTypeNotSupportedError(resourceType)
}
}
}

View file

@ -29,7 +29,6 @@ import org.json4s.JsonAST.{JArray, JString}
import org.json4s.jackson.JsonMethods._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
@ -37,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@ -145,7 +145,7 @@ case class CatalogTablePartition(
/** Return the partition location, assuming it is specified. */
def location: URI = storage.locationUri.getOrElse {
val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
throw new AnalysisException(s"Partition [$specString] did not specify locationUri")
throw QueryCompilationErrors.partitionNotSpecifyLocationUriError(specString)
}
/**
@ -182,9 +182,8 @@ case class BucketSpec(
sortColumnNames: Seq[String]) extends SQLConfHelper {
if (numBuckets <= 0 || numBuckets > conf.bucketingMaxBuckets) {
throw new AnalysisException(
s"Number of buckets should be greater than 0 but less than or equal to " +
s"bucketing.maxBuckets (`${conf.bucketingMaxBuckets}`). Got `$numBuckets`")
throw QueryCompilationErrors.invalidBucketNumberError(
conf.bucketingMaxBuckets, numBuckets)
}
override def toString: String = {
@ -274,12 +273,12 @@ case class CatalogTable(
/** Return the database this table was specified to belong to, assuming it exists. */
def database: String = identifier.database.getOrElse {
throw new AnalysisException(s"table $identifier did not specify database")
throw QueryCompilationErrors.tableNotSpecifyDatabaseError(identifier)
}
/** Return the table location, assuming it is specified. */
def location: URI = storage.locationUri.getOrElse {
throw new AnalysisException(s"table $identifier did not specify locationUri")
throw QueryCompilationErrors.tableNotSpecifyLocationUriError(identifier)
}
/** Return the fully qualified name of this table, assuming the database was specified. */
@ -295,8 +294,7 @@ case class CatalogTable(
(0 until numParts).map { index =>
properties.getOrElse(
s"$VIEW_CATALOG_AND_NAMESPACE_PART_PREFIX$index",
throw new AnalysisException("Corrupted table name context in catalog: " +
s"$numParts parts expected, but part $index is missing.")
throw QueryCompilationErrors.corruptedTableNameContextInCatalogError(numParts, index)
)
}
} else if (properties.contains(VIEW_DEFAULT_DATABASE)) {
@ -318,8 +316,7 @@ case class CatalogTable(
yield (key.substring(CatalogTable.VIEW_SQL_CONFIG_PREFIX.length), value)
} catch {
case e: Exception =>
throw new AnalysisException(
"Corrupted view SQL configs in catalog", cause = Some(e))
throw QueryCompilationErrors.corruptedViewSQLConfigsInCatalogError(e)
}
}
@ -334,8 +331,7 @@ case class CatalogTable(
index <- 0 until numCols.toInt
} yield properties.getOrElse(
s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index",
throw new AnalysisException("Corrupted view query output column names in catalog: " +
s"$numCols parts expected, but part $index is missing.")
throw QueryCompilationErrors.corruptedViewQueryOutputColumnsInCatalogError(numCols, index)
)
}
@ -352,8 +348,7 @@ case class CatalogTable(
}.getOrElse(Seq.empty)
} catch {
case e: Exception =>
throw new AnalysisException(
"corrupted view referred temp view names in catalog", cause = Some(e))
throw QueryCompilationErrors.corruptedViewReferredTempViewInCatalogError(e)
}
}
@ -368,8 +363,7 @@ case class CatalogTable(
}.getOrElse(Seq.empty)
} catch {
case e: Exception =>
throw new AnalysisException(
"corrupted view referred temp functions names in catalog", cause = Some(e))
throw QueryCompilationErrors.corruptedViewReferredTempFunctionsInCatalogError(e)
}
}
@ -497,14 +491,13 @@ object CatalogTable {
None
} else {
val numParts = props.get(s"$key.numParts")
val errorMessage = s"Cannot read table property '$key' as it's corrupted."
if (numParts.isEmpty) {
throw new AnalysisException(errorMessage)
throw QueryCompilationErrors.cannotReadCorruptedTablePropertyError(key)
} else {
val parts = (0 until numParts.get.toInt).map { index =>
props.getOrElse(s"$key.part.$index", {
throw new AnalysisException(
s"$errorMessage Missing part $index, ${numParts.get} parts are expected.")
throw QueryCompilationErrors.cannotReadCorruptedTablePropertyError(
key, s"Missing part $index, $numParts parts are expected.")
})
}
Some(parts.mkString)
@ -657,8 +650,8 @@ object CatalogColumnStat extends Logging {
// This version of Spark does not use min/max for binary/string types so we ignore it.
case BinaryType | StringType => null
case _ =>
throw new AnalysisException("Column statistics deserialization is not supported for " +
s"column $name of data type: $dataType.")
throw QueryCompilationErrors.columnStatisticsDeserializationNotSupportedError(
name, dataType)
}
}
@ -674,8 +667,8 @@ object CatalogColumnStat extends Logging {
case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
// This version of Spark does not use min/max for binary/string types so we ignore it.
case _ =>
throw new AnalysisException("Column statistics serialization is not supported for " +
s"column $colName of data type: $dataType.")
throw QueryCompilationErrors.columnStatisticsSerializationNotSupportedError(
colName, dataType)
}
externalValue.toString
}
@ -805,7 +798,7 @@ case class HiveTableRelation(
tableMeta.stats.map(_.toPlanStats(output, conf.cboEnabled || conf.planStatsEnabled))
.orElse(tableStats)
.getOrElse {
throw new IllegalStateException("table stats must be specified.")
throw QueryExecutionErrors.tableStatsNotSpecifiedError
}
}