[SPARK-32670][SQL] Group exception messages in Catalyst Analyzer in one file

### What changes were proposed in this pull request?

Group all messages of `AnalysisExcpetions` created and thrown directly in org.apache.spark.sql.catalyst.analysis.Analyzer in one file.
* Create a new object: `org.apache.spark.sql.CatalystErrors` with many exception-creating functions.
* When the `Analyzer` wants to create and throw a new `AnalysisException`, call functions of `CatalystErrors`

### Why are the changes needed?

This is the sample PR that groups exception messages together in several files. It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?

No. Error messages remain unchanged.

### How was this patch tested?

No new tests - pass all original tests to make sure it doesn't break any existing behavior.

### Naming of exception functions

All function names ended with `Error`.
* For specific errors like `groupingIDMismatch` and `groupingColInvalid`, directly use them as name, just like `groupingIDMismatchError` and `groupingColInvalidError`.
* For generic errors like `dataTypeMismatch`,
  * if confident with the context, prefix and condition can be added, like `pivotValDataTypeMismatchError`
  * if not sure about the context, add a `For` suffix of the specific component that this exception is related to, like `dataTypeMismatchForDeserializerError`

Closes #29497 from anchovYu/32670.

Lead-authored-by: anchovYu <aureole@sjtu.edu.cn>
Co-authored-by: anchovYu <xyyu15@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
anchovYu 2020-11-21 08:33:39 +09:00 committed by HyukjinKwon
parent 2479778934
commit de0f50abf4
2 changed files with 192 additions and 56 deletions

View file

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.errors
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Expression, GroupingID}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.toPrettySQL
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{AbstractDataType, DataType, StructType}
/**
* Object for grouping all error messages of the query compilation.
* Currently it includes all AnalysisExcpetions created and thrown directly in
* org.apache.spark.sql.catalyst.analysis.Analyzer.
*/
object QueryCompilationErrors {
def groupingIDMismatchError(groupingID: GroupingID, groupByExprs: Seq[Expression]): Throwable = {
new AnalysisException(
s"Columns of grouping_id (${groupingID.groupByExprs.mkString(",")}) " +
s"does not match grouping columns (${groupByExprs.mkString(",")})")
}
def groupingColInvalidError(groupingCol: Expression, groupByExprs: Seq[Expression]): Throwable = {
new AnalysisException(
s"Column of grouping ($groupingCol) can't be found " +
s"in grouping columns ${groupByExprs.mkString(",")}")
}
def groupingSizeTooLargeError(sizeLimit: Int): Throwable = {
new AnalysisException(
s"Grouping sets size cannot be greater than $sizeLimit")
}
def unorderablePivotColError(pivotCol: Expression): Throwable = {
new AnalysisException(
s"Invalid pivot column '$pivotCol'. Pivot columns must be comparable."
)
}
def nonLiteralPivotValError(pivotVal: Expression): Throwable = {
new AnalysisException(
s"Literal expressions required for pivot values, found '$pivotVal'")
}
def pivotValDataTypeMismatchError(pivotVal: Expression, pivotCol: Expression): Throwable = {
new AnalysisException(
s"Invalid pivot value '$pivotVal': " +
s"value data type ${pivotVal.dataType.simpleString} does not match " +
s"pivot column data type ${pivotCol.dataType.catalogString}")
}
def unsupportedIfNotExistsError(tableName: String): Throwable = {
new AnalysisException(
s"Cannot write, IF NOT EXISTS is not supported for table: $tableName")
}
def nonPartitionColError(partitionName: String): Throwable = {
new AnalysisException(
s"PARTITION clause cannot contain a non-partition column name: $partitionName")
}
def addStaticValToUnknownColError(staticName: String): Throwable = {
new AnalysisException(
s"Cannot add static value for unknown column: $staticName")
}
def unknownStaticPartitionColError(name: String): Throwable = {
new AnalysisException(s"Unknown static partition column: $name")
}
def nestedGeneratorError(trimmedNestedGenerator: Expression): Throwable = {
new AnalysisException(
"Generators are not supported when it's nested in " +
"expressions, but got: " + toPrettySQL(trimmedNestedGenerator))
}
def moreThanOneGeneratorError(generators: Seq[Expression], clause: String): Throwable = {
new AnalysisException(
s"Only one generator allowed per $clause clause but found " +
generators.size + ": " + generators.map(toPrettySQL).mkString(", "))
}
def generatorOutsideSelectError(plan: LogicalPlan): Throwable = {
new AnalysisException(
"Generators are not supported outside the SELECT clause, but " +
"got: " + plan.simpleString(SQLConf.get.maxToStringFields))
}
def legacyStoreAssignmentPolicyError(): Throwable = {
val configKey = SQLConf.STORE_ASSIGNMENT_POLICY.key
new AnalysisException(
"LEGACY store assignment policy is disallowed in Spark data source V2. " +
s"Please set the configuration $configKey to other values.")
}
def unresolvedUsingColForJoinError(
colName: String, plan: LogicalPlan, side: String): Throwable = {
new AnalysisException(
s"USING column `$colName` cannot be resolved on the $side " +
s"side of the join. The $side-side columns: [${plan.output.map(_.name).mkString(", ")}]")
}
def dataTypeMismatchForDeserializerError(
dataType: DataType, desiredType: String): Throwable = {
val quantifier = if (desiredType.equals("array")) "an" else "a"
new AnalysisException(
s"need $quantifier $desiredType field but got " + dataType.catalogString)
}
def fieldNumberMismatchForDeserializerError(
schema: StructType, maxOrdinal: Int): Throwable = {
new AnalysisException(
s"Try to map ${schema.catalogString} to Tuple${maxOrdinal + 1}, " +
"but failed as the number of fields does not line up.")
}
def upCastFailureError(
fromStr: String, from: Expression, to: DataType, walkedTypePath: Seq[String]): Throwable = {
new AnalysisException(
s"Cannot up cast $fromStr from " +
s"${from.dataType.catalogString} to ${to.catalogString}.\n" +
s"The type path of the target object is:\n" + walkedTypePath.mkString("", "\n", "\n") +
"You can either add an explicit cast to the input data or choose a higher precision " +
"type of the field in the target object")
}
def unsupportedAbstractDataTypeForUpCastError(gotType: AbstractDataType): Throwable = {
new AnalysisException(
s"UpCast only support DecimalType as AbstractDataType yet, but got: $gotType")
}
def outerScopeFailureForNewInstanceError(className: String): Throwable = {
new AnalysisException(
s"Unable to generate an encoder for inner class `$className` without " +
"access to the scope that this class was defined in.\n" +
"Try moving this class out of its parent class.")
}
def referenceColNotFoundForAlterTableChangesError(
after: TableChange.After, parentName: String): Throwable = {
new AnalysisException(
s"Couldn't find the reference column for $after at $parentName")
}
}

View file

@ -44,6 +44,7 @@ import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnChange, ColumnPosition, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{PartitionOverwriteMode, StoreAssignmentPolicy}
@ -448,9 +449,7 @@ class Analyzer(override val catalogManager: CatalogManager)
e.groupByExprs.map(_.canonicalized) == groupByExprs.map(_.canonicalized)) {
Alias(gid, toPrettySQL(e))()
} else {
throw new AnalysisException(
s"Columns of grouping_id (${e.groupByExprs.mkString(",")}) does not match " +
s"grouping columns (${groupByExprs.mkString(",")})")
throw QueryCompilationErrors.groupingIDMismatchError(e, groupByExprs)
}
case e @ Grouping(col: Expression) =>
val idx = groupByExprs.indexWhere(_.semanticEquals(col))
@ -458,8 +457,7 @@ class Analyzer(override val catalogManager: CatalogManager)
Alias(Cast(BitwiseAnd(ShiftRight(gid, Literal(groupByExprs.length - 1 - idx)),
Literal(1L)), ByteType), toPrettySQL(e))()
} else {
throw new AnalysisException(s"Column of grouping ($col) can't be found " +
s"in grouping columns ${groupByExprs.mkString(",")}")
throw QueryCompilationErrors.groupingColInvalidError(col, groupByExprs)
}
}
}
@ -575,8 +573,7 @@ class Analyzer(override val catalogManager: CatalogManager)
val finalGroupByExpressions = getFinalGroupByExpressions(selectedGroupByExprs, groupByExprs)
if (finalGroupByExpressions.size > GroupingID.dataType.defaultSize * 8) {
throw new AnalysisException(
s"Grouping sets size cannot be greater than ${GroupingID.dataType.defaultSize * 8}")
throw QueryCompilationErrors.groupingSizeTooLargeError(GroupingID.dataType.defaultSize * 8)
}
// Expand works by setting grouping expressions to null as determined by the
@ -712,8 +709,7 @@ class Analyzer(override val catalogManager: CatalogManager)
|| !p.pivotColumn.resolved || !p.pivotValues.forall(_.resolved) => p
case Pivot(groupByExprsOpt, pivotColumn, pivotValues, aggregates, child) =>
if (!RowOrdering.isOrderable(pivotColumn.dataType)) {
throw new AnalysisException(
s"Invalid pivot column '${pivotColumn}'. Pivot columns must be comparable.")
throw QueryCompilationErrors.unorderablePivotColError(pivotColumn)
}
// Check all aggregate expressions.
aggregates.foreach(checkValidAggregateExpression)
@ -724,13 +720,10 @@ class Analyzer(override val catalogManager: CatalogManager)
case _ => value.foldable
}
if (!foldable) {
throw new AnalysisException(
s"Literal expressions required for pivot values, found '$value'")
throw QueryCompilationErrors.nonLiteralPivotValError(value)
}
if (!Cast.canCast(value.dataType, pivotColumn.dataType)) {
throw new AnalysisException(s"Invalid pivot value '$value': " +
s"value data type ${value.dataType.simpleString} does not match " +
s"pivot column data type ${pivotColumn.dataType.catalogString}")
throw QueryCompilationErrors.pivotValDataTypeMismatchError(value, pivotColumn)
}
Cast(value, pivotColumn.dataType, Some(conf.sessionLocalTimeZone)).eval(EmptyRow)
}
@ -1167,8 +1160,7 @@ class Analyzer(override val catalogManager: CatalogManager)
case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _) if i.query.resolved =>
// ifPartitionNotExists is append with validation, but validation is not supported
if (i.ifPartitionNotExists) {
throw new AnalysisException(
s"Cannot write, IF NOT EXISTS is not supported for table: ${r.table.name}")
throw QueryCompilationErrors.unsupportedIfNotExistsError(r.table.name)
}
val partCols = partitionColumnNames(r.table)
@ -1205,8 +1197,7 @@ class Analyzer(override val catalogManager: CatalogManager)
partitionColumnNames.find(name => conf.resolver(name, partitionName)) match {
case Some(_) =>
case None =>
throw new AnalysisException(
s"PARTITION clause cannot contain a non-partition column name: $partitionName")
throw QueryCompilationErrors.nonPartitionColError(partitionName)
}
}
}
@ -1228,8 +1219,7 @@ class Analyzer(override val catalogManager: CatalogManager)
case Some(attr) =>
attr.name -> staticName
case _ =>
throw new AnalysisException(
s"Cannot add static value for unknown column: $staticName")
throw QueryCompilationErrors.addStaticValToUnknownColError(staticName)
}).toMap
val queryColumns = query.output.iterator
@ -1271,7 +1261,7 @@ class Analyzer(override val catalogManager: CatalogManager)
// an UnresolvedAttribute.
EqualTo(UnresolvedAttribute(attr.name), Cast(Literal(value), attr.dataType))
case None =>
throw new AnalysisException(s"Unknown static partition column: $name")
throw QueryCompilationErrors.unknownStaticPartitionColError(name)
}
}.reduce(And)
}
@ -2483,23 +2473,19 @@ class Analyzer(override val catalogManager: CatalogManager)
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case Project(projectList, _) if projectList.exists(hasNestedGenerator) =>
val nestedGenerator = projectList.find(hasNestedGenerator).get
throw new AnalysisException("Generators are not supported when it's nested in " +
"expressions, but got: " + toPrettySQL(trimAlias(nestedGenerator)))
throw QueryCompilationErrors.nestedGeneratorError(trimAlias(nestedGenerator))
case Project(projectList, _) if projectList.count(hasGenerator) > 1 =>
val generators = projectList.filter(hasGenerator).map(trimAlias)
throw new AnalysisException("Only one generator allowed per select clause but found " +
generators.size + ": " + generators.map(toPrettySQL).mkString(", "))
throw QueryCompilationErrors.moreThanOneGeneratorError(generators, "select")
case Aggregate(_, aggList, _) if aggList.exists(hasNestedGenerator) =>
val nestedGenerator = aggList.find(hasNestedGenerator).get
throw new AnalysisException("Generators are not supported when it's nested in " +
"expressions, but got: " + toPrettySQL(trimAlias(nestedGenerator)))
throw QueryCompilationErrors.nestedGeneratorError(trimAlias(nestedGenerator))
case Aggregate(_, aggList, _) if aggList.count(hasGenerator) > 1 =>
val generators = aggList.filter(hasGenerator).map(trimAlias)
throw new AnalysisException("Only one generator allowed per aggregate clause but found " +
generators.size + ": " + generators.map(toPrettySQL).mkString(", "))
throw QueryCompilationErrors.moreThanOneGeneratorError(generators, "aggregate")
case agg @ Aggregate(groupList, aggList, child) if aggList.forall {
case AliasedGenerator(_, _, _) => true
@ -2582,8 +2568,7 @@ class Analyzer(override val catalogManager: CatalogManager)
case g: Generate => g
case p if p.expressions.exists(hasGenerator) =>
throw new AnalysisException("Generators are not supported outside the SELECT clause, but " +
"got: " + p.simpleString(SQLConf.get.maxToStringFields))
throw QueryCompilationErrors.generatorOutsideSelectError(p)
}
}
@ -3122,10 +3107,7 @@ class Analyzer(override val catalogManager: CatalogManager)
private def validateStoreAssignmentPolicy(): Unit = {
// SPARK-28730: LEGACY store assignment policy is disallowed in data source v2.
if (conf.storeAssignmentPolicy == StoreAssignmentPolicy.LEGACY) {
val configKey = SQLConf.STORE_ASSIGNMENT_POLICY.key
throw new AnalysisException(s"""
|"LEGACY" store assignment policy is disallowed in Spark data source V2.
|Please set the configuration $configKey to other values.""".stripMargin)
throw QueryCompilationErrors.legacyStoreAssignmentPolicyError()
}
}
@ -3138,14 +3120,12 @@ class Analyzer(override val catalogManager: CatalogManager)
hint: JoinHint) = {
val leftKeys = joinNames.map { keyName =>
left.output.find(attr => resolver(attr.name, keyName)).getOrElse {
throw new AnalysisException(s"USING column `$keyName` cannot be resolved on the left " +
s"side of the join. The left-side columns: [${left.output.map(_.name).mkString(", ")}]")
throw QueryCompilationErrors.unresolvedUsingColForJoinError(keyName, left, "left")
}
}
val rightKeys = joinNames.map { keyName =>
right.output.find(attr => resolver(attr.name, keyName)).getOrElse {
throw new AnalysisException(s"USING column `$keyName` cannot be resolved on the right " +
s"side of the join. The right-side columns: [${right.output.map(_.name).mkString(", ")}]")
throw QueryCompilationErrors.unresolvedUsingColForJoinError(keyName, right, "right")
}
}
val joinPairs = leftKeys.zip(rightKeys)
@ -3208,7 +3188,8 @@ class Analyzer(override val catalogManager: CatalogManager)
ExtractValue(child, fieldName, resolver)
}
case other =>
throw new AnalysisException("need an array field but got " + other.catalogString)
throw QueryCompilationErrors.dataTypeMismatchForDeserializerError(other,
"array")
}
case u: UnresolvedCatalystToExternalMap if u.child.resolved =>
u.child.dataType match {
@ -3218,7 +3199,7 @@ class Analyzer(override val catalogManager: CatalogManager)
ExtractValue(child, fieldName, resolver)
}
case other =>
throw new AnalysisException("need a map field but got " + other.catalogString)
throw QueryCompilationErrors.dataTypeMismatchForDeserializerError(other, "map")
}
}
validateNestedTupleFields(result)
@ -3227,8 +3208,7 @@ class Analyzer(override val catalogManager: CatalogManager)
}
private def fail(schema: StructType, maxOrdinal: Int): Unit = {
throw new AnalysisException(s"Try to map ${schema.catalogString} to Tuple${maxOrdinal + 1}" +
", but failed as the number of fields does not line up.")
throw QueryCompilationErrors.fieldNumberMismatchForDeserializerError(schema, maxOrdinal)
}
/**
@ -3287,10 +3267,7 @@ class Analyzer(override val catalogManager: CatalogManager)
case n: NewInstance if n.childrenResolved && !n.resolved =>
val outer = OuterScopes.getOuterScope(n.cls)
if (outer == null) {
throw new AnalysisException(
s"Unable to generate an encoder for inner class `${n.cls.getName}` without " +
"access to the scope that this class was defined in.\n" +
"Try moving this class out of its parent class.")
throw QueryCompilationErrors.outerScopeFailureForNewInstanceError(n.cls.getName)
}
n.copy(outerPointer = Some(outer))
}
@ -3306,11 +3283,7 @@ class Analyzer(override val catalogManager: CatalogManager)
case l: LambdaVariable => "array element"
case e => e.sql
}
throw new AnalysisException(s"Cannot up cast $fromStr from " +
s"${from.dataType.catalogString} to ${to.catalogString}.\n" +
"The type path of the target object is:\n" + walkedTypePath.mkString("", "\n", "\n") +
"You can either add an explicit cast to the input data or choose a higher precision " +
"type of the field in the target object")
throw QueryCompilationErrors.upCastFailureError(fromStr, from, to, walkedTypePath)
}
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
@ -3321,8 +3294,7 @@ class Analyzer(override val catalogManager: CatalogManager)
case u @ UpCast(child, _, _) if !child.resolved => u
case UpCast(_, target, _) if target != DecimalType && !target.isInstanceOf[DataType] =>
throw new AnalysisException(
s"UpCast only support DecimalType as AbstractDataType yet, but got: $target")
throw QueryCompilationErrors.unsupportedAbstractDataTypeForUpCastError(target)
case UpCast(child, target, walkedTypePath) if target == DecimalType
&& child.dataType.isInstanceOf[DecimalType] =>
@ -3501,8 +3473,8 @@ class Analyzer(override val catalogManager: CatalogManager)
case Some(colName) =>
ColumnPosition.after(colName)
case None =>
throw new AnalysisException("Couldn't find the reference column for " +
s"$after at $parentName")
throw QueryCompilationErrors.referenceColNotFoundForAlterTableChangesError(after,
parentName)
}
case other => other
}