diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala index c680502cb3..87387b18db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala @@ -18,9 +18,13 @@ package org.apache.spark.sql.errors import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Expression, GroupingID} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.ResolvedView +import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction, WindowSpecDefinition} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.toPrettySQL +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{AbstractDataType, DataType, StructType} @@ -31,6 +35,7 @@ import org.apache.spark.sql.types.{AbstractDataType, DataType, StructType} * org.apache.spark.sql.catalyst.analysis.Analyzer. */ object QueryCompilationErrors { + def groupingIDMismatchError(groupingID: GroupingID, groupByExprs: Seq[Expression]): Throwable = { new AnalysisException( s"Columns of grouping_id (${groupingID.groupByExprs.mkString(",")}) " + @@ -159,6 +164,166 @@ object QueryCompilationErrors { s"Couldn't find the reference column for $after at $parentName") } + def windowSpecificationNotDefinedError(windowName: String): Throwable = { + new AnalysisException(s"Window specification $windowName is not defined in the WINDOW clause.") + } + + def selectExprNotInGroupByError(expr: Expression, groupByAliases: Seq[Alias]): Throwable = { + new AnalysisException(s"$expr doesn't show up in the GROUP BY list $groupByAliases") + } + + def groupingMustWithGroupingSetsOrCubeOrRollupError(): Throwable = { + new AnalysisException("grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup") + } + + def pandasUDFAggregateNotSupportedInPivotError(): Throwable = { + new AnalysisException("Pandas UDF aggregate expressions are currently not supported in pivot.") + } + + def aggregateExpressionRequiredForPivotError(sql: String): Throwable = { + new AnalysisException(s"Aggregate expression required for pivot, but '$sql' " + + "did not appear in any aggregate function.") + } + + def expectTableNotTempViewError(quoted: String, cmd: String, t: TreeNode[_]): Throwable = { + new AnalysisException(s"$quoted is a temp view. '$cmd' expects a table", + t.origin.line, t.origin.startPosition) + } + + def expectTableOrPermanentViewNotTempViewError( + quoted: String, cmd: String, t: TreeNode[_]): Throwable = { + new AnalysisException(s"$quoted is a temp view. '$cmd' expects a table or permanent view.", + t.origin.line, t.origin.startPosition) + } + + def viewDepthExceedsMaxResolutionDepthError( + identifier: TableIdentifier, maxNestedViewDepth: Int, t: TreeNode[_]): Throwable = { + new AnalysisException(s"The depth of view $identifier exceeds the maximum " + + s"view resolution depth ($maxNestedViewDepth). Analysis is aborted to " + + s"avoid errors. Increase the value of ${SQLConf.MAX_NESTED_VIEW_DEPTH.key} to work " + + "around this.", t.origin.line, t.origin.startPosition) + } + + def insertIntoViewNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = { + new AnalysisException(s"Inserting into a view is not allowed. View: $identifier.", + t.origin.line, t.origin.startPosition) + } + + def writeIntoViewNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = { + new AnalysisException(s"Writing into a view is not allowed. View: $identifier.", + t.origin.line, t.origin.startPosition) + } + + def writeIntoV1TableNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = { + new AnalysisException(s"Cannot write into v1 table: $identifier.", + t.origin.line, t.origin.startPosition) + } + + def expectTableNotViewError(v: ResolvedView, cmd: String, t: TreeNode[_]): Throwable = { + val viewStr = if (v.isTemp) "temp view" else "view" + new AnalysisException(s"${v.identifier.quoted} is a $viewStr. '$cmd' expects a table.", + t.origin.line, t.origin.startPosition) + } + + def starNotAllowedWhenGroupByOrdinalPositionUsedError(): Throwable = { + new AnalysisException( + "Star (*) is not allowed in select list when GROUP BY ordinal position is used") + } + + def invalidStarUsageError(prettyName: String): Throwable = { + new AnalysisException(s"Invalid usage of '*' in $prettyName") + } + + def orderByPositionRangeError(index: Int, size: Int, t: TreeNode[_]): Throwable = { + new AnalysisException(s"ORDER BY position $index is not in select list " + + s"(valid range is [1, $size])", t.origin.line, t.origin.startPosition) + } + + def groupByPositionRangeError(index: Int, size: Int, t: TreeNode[_]): Throwable = { + new AnalysisException(s"GROUP BY position $index is not in select list " + + s"(valid range is [1, $size])", t.origin.line, t.origin.startPosition) + } + + def generatorNotExpectedError(name: FunctionIdentifier, classCanonicalName: String): Throwable = { + new AnalysisException(s"$name is expected to be a generator. However, " + + s"its class is $classCanonicalName, which is not a generator.") + } + + def distinctOrFilterOnlyWithAggregateFunctionError(prettyName: String): Throwable = { + new AnalysisException("DISTINCT or FILTER specified, " + + s"but $prettyName is not an aggregate function") + } + + def nonDeterministicFilterInAggregateError(): Throwable = { + new AnalysisException("FILTER expression is non-deterministic, " + + "it cannot be used in aggregate functions") + } + + def aliasNumberNotMatchColumnNumberError( + columnSize: Int, outputSize: Int, t: TreeNode[_]): Throwable = { + new AnalysisException("Number of column aliases does not match number of columns. " + + s"Number of column aliases: $columnSize; " + + s"number of columns: $outputSize.", t.origin.line, t.origin.startPosition) + } + + def aliasesNumberNotMatchUDTFOutputError( + aliasesSize: Int, aliasesNames: String): Throwable = { + new AnalysisException("The number of aliases supplied in the AS clause does not " + + s"match the number of columns output by the UDTF expected $aliasesSize " + + s"aliases but got $aliasesNames ") + } + + def windowAggregateFunctionWithFilterNotSupportedError(): Throwable = { + new AnalysisException("window aggregate function with filter predicate is not supported yet.") + } + + def windowFunctionInsideAggregateFunctionNotAllowedError(): Throwable = { + new AnalysisException("It is not allowed to use a window function inside an aggregate " + + "function. Please use the inner window function in a sub-query.") + } + + def expressionWithoutWindowExpressionError(expr: NamedExpression): Throwable = { + new AnalysisException(s"$expr does not have any WindowExpression.") + } + + def expressionWithMultiWindowExpressionsError( + expr: NamedExpression, distinctWindowSpec: Seq[WindowSpecDefinition]): Throwable = { + new AnalysisException(s"$expr has multiple Window Specifications ($distinctWindowSpec)." + + "Please file a bug report with this error message, stack trace, and the query.") + } + + def windowFunctionNotAllowedError(clauseName: String): Throwable = { + new AnalysisException(s"It is not allowed to use window functions inside $clauseName clause") + } + + def cannotSpecifyWindowFrameError(prettyName: String): Throwable = { + new AnalysisException(s"Cannot specify window frame for $prettyName function") + } + + def windowFrameNotMatchRequiredFrameError( + f: SpecifiedWindowFrame, required: WindowFrame): Throwable = { + new AnalysisException(s"Window Frame $f must match the required frame $required") + } + + def windowFunctionWithWindowFrameNotOrderedError(wf: WindowFunction): Throwable = { + new AnalysisException(s"Window function $wf requires window to be ordered, please add " + + s"ORDER BY clause. For example SELECT $wf(value_expr) OVER (PARTITION BY window_partition " + + "ORDER BY window_ordering) from table") + } + + def cannotResolveUserSpecifiedColumnsError(col: String, t: TreeNode[_]): Throwable = { + new AnalysisException(s"Cannot resolve column name $col", t.origin.line, t.origin.startPosition) + } + + def writeTableWithMismatchedColumnsError( + columnSize: Int, outputSize: Int, t: TreeNode[_]): Throwable = { + new AnalysisException("Cannot write to table due to mismatched user specified column " + + s"size($columnSize) and data column size($outputSize)", t.origin.line, t.origin.startPosition) + } + + def multiTimeWindowExpressionsNotSupportedError(t: TreeNode[_]): Throwable = { + new AnalysisException("Multiple time window expressions would result in a cartesian product " + + "of rows, therefore they are currently not supported.", t.origin.line, t.origin.startPosition) + } + } - - diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7d1edbae9c..0d719b1f53 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -368,10 +368,8 @@ class Analyzer(override val catalogManager: CatalogManager) // Lookup WindowSpecDefinitions. This rule works with unresolved children. case WithWindowDefinition(windowDefinitions, child) => child.resolveExpressions { case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) => - val errorMessage = - s"Window specification $windowName is not defined in the WINDOW clause." - val windowSpecDefinition = - windowDefinitions.getOrElse(windowName, failAnalysis(errorMessage)) + val windowSpecDefinition = windowDefinitions.getOrElse(windowName, + throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowName)) WindowExpression(c, windowSpecDefinition) } } @@ -515,7 +513,7 @@ class Analyzer(override val catalogManager: CatalogManager) val groupingSetsAttributes = selectedGroupByExprs.map { groupingSetExprs => groupingSetExprs.map { expr => val alias = groupByAliases.find(_.child.semanticEquals(expr)).getOrElse( - failAnalysis(s"$expr doesn't show up in the GROUP BY list $groupByAliases")) + throw QueryCompilationErrors.selectExprNotInGroupByError(expr, groupByAliases)) // Map alias to expanded attribute. expandedAttributes.find(_.semanticEquals(alias.toAttribute)).getOrElse( alias.toAttribute) @@ -619,11 +617,11 @@ class Analyzer(override val catalogManager: CatalogManager) val gid = a.groupingExpressions.last if (!gid.isInstanceOf[AttributeReference] || gid.asInstanceOf[AttributeReference].name != VirtualColumn.groupingIdName) { - failAnalysis(s"grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup") + throw QueryCompilationErrors.groupingMustWithGroupingSetsOrCubeOrRollupError() } a.groupingExpressions.take(a.groupingExpressions.length - 1) }.getOrElse { - failAnalysis(s"grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup") + throw QueryCompilationErrors.groupingMustWithGroupingSetsOrCubeOrRollupError() } } @@ -833,11 +831,9 @@ class Analyzer(override val catalogManager: CatalogManager) private def checkValidAggregateExpression(expr: Expression): Unit = expr match { case _: AggregateExpression => // OK and leave the argument check to CheckAnalysis. case expr: PythonUDF if PythonUDF.isGroupedAggPandasUDF(expr) => - failAnalysis("Pandas UDF aggregate expressions are currently not supported in pivot.") + throw QueryCompilationErrors.pandasUDFAggregateNotSupportedInPivotError() case e: Attribute => - failAnalysis( - s"Aggregate expression required for pivot, but '${e.sql}' " + - s"did not appear in any aggregate function.") + throw QueryCompilationErrors.aggregateExpressionRequiredForPivotError(e.sql) case e => e.children.foreach(checkValidAggregateExpression) } } @@ -886,7 +882,7 @@ class Analyzer(override val catalogManager: CatalogManager) } case u @ UnresolvedTable(ident, cmd) => lookupTempView(ident).foreach { _ => - u.failAnalysis(s"${ident.quoted} is a temp view. '$cmd' expects a table") + throw QueryCompilationErrors.expectTableNotTempViewError(ident.quoted, cmd, u) } u case u @ UnresolvedView(ident, cmd, allowTemp, _) => @@ -901,8 +897,8 @@ class Analyzer(override val catalogManager: CatalogManager) lookupTempView(ident) .map { _ => if (!allowTempView) { - u.failAnalysis( - s"${ident.quoted} is a temp view. '$cmd' expects a table or permanent view.") + throw QueryCompilationErrors.expectTableOrPermanentViewNotTempViewError( + ident.quoted, cmd, u) } ResolvedView(ident.asIdentifier, isTemp = true) } @@ -1062,10 +1058,8 @@ class Analyzer(override val catalogManager: CatalogManager) val nestedViewDepth = AnalysisContext.get.nestedViewDepth val maxNestedViewDepth = AnalysisContext.get.maxNestedViewDepth if (nestedViewDepth > maxNestedViewDepth) { - view.failAnalysis(s"The depth of view ${desc.identifier} exceeds the maximum " + - s"view resolution depth ($maxNestedViewDepth). Analysis is aborted to " + - s"avoid errors. Increase the value of ${SQLConf.MAX_NESTED_VIEW_DEPTH.key} to " + - "work around this.") + throw QueryCompilationErrors.viewDepthExceedsMaxResolutionDepthError( + desc.identifier, maxNestedViewDepth, view) } SQLConf.withExistingConf(View.effectiveSQLConf(desc.viewSQLConfigs, isTempView)) { executeSameContext(child) @@ -1087,7 +1081,7 @@ class Analyzer(override val catalogManager: CatalogManager) EliminateSubqueryAliases(relation) match { case v: View => - table.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.") + throw QueryCompilationErrors.insertIntoViewNotAllowedError(v.desc.identifier, table) case other => i.copy(table = other) } @@ -1098,10 +1092,11 @@ class Analyzer(override val catalogManager: CatalogManager) lookupRelation(u.multipartIdentifier, u.options, false) .map(EliminateSubqueryAliases(_)) .map { - case v: View => write.failAnalysis( - s"Writing into a view is not allowed. View: ${v.desc.identifier}.") - case u: UnresolvedCatalogRelation => write.failAnalysis( - "Cannot write into v1 table: " + u.tableMeta.identifier) + case v: View => throw QueryCompilationErrors.writeIntoViewNotAllowedError( + v.desc.identifier, write) + case u: UnresolvedCatalogRelation => + throw QueryCompilationErrors.writeIntoV1TableNotAllowedError( + u.tableMeta.identifier, write) case r: DataSourceV2Relation => write.withNewTable(r) case other => throw new IllegalStateException( "[BUG] unexpected plan returned by `lookupRelation`: " + other) @@ -1115,9 +1110,7 @@ class Analyzer(override val catalogManager: CatalogManager) case u @ UnresolvedTable(identifier, cmd) => lookupTableOrView(identifier).map { - case v: ResolvedView => - val viewStr = if (v.isTemp) "temp view" else "view" - u.failAnalysis(s"${v.identifier.quoted} is a $viewStr. '$cmd' expects a table.") + case v: ResolvedView => throw QueryCompilationErrors.expectTableNotViewError(v, cmd, u) case table => table }.getOrElse(u) @@ -1488,8 +1481,7 @@ class Analyzer(override val catalogManager: CatalogManager) // If the aggregate function argument contains Stars, expand it. case a: Aggregate if containsStar(a.aggregateExpressions) => if (a.groupingExpressions.exists(_.isInstanceOf[UnresolvedOrdinal])) { - failAnalysis( - "Star (*) is not allowed in select list when GROUP BY ordinal position is used") + throw QueryCompilationErrors.starNotAllowedWhenGroupByOrdinalPositionUsedError() } else { a.copy(aggregateExpressions = buildExpandedProjectList(a.aggregateExpressions, a.child)) } @@ -1502,7 +1494,7 @@ class Analyzer(override val catalogManager: CatalogManager) } ) case g: Generate if containsStar(g.generator.children) => - failAnalysis("Invalid usage of '*' in explode/json_tuple/UDTF") + throw QueryCompilationErrors.invalidStarUsageError("explode/json_tuple/UDTF") // To resolve duplicate expression IDs for Join and Intersect case j @ Join(left, right, _, _, _) if !j.duplicateResolved => @@ -1762,7 +1754,7 @@ class Analyzer(override val catalogManager: CatalogManager) }) // count(*) has been replaced by count(1) case o if containsStar(o.children) => - failAnalysis(s"Invalid usage of '*' in expression '${o.prettyName}'") + throw QueryCompilationErrors.invalidStarUsageError(s"expression '${o.prettyName}'") } } } @@ -1864,9 +1856,7 @@ class Analyzer(override val catalogManager: CatalogManager) if (index > 0 && index <= child.output.size) { SortOrder(child.output(index - 1), direction, nullOrdering, Seq.empty) } else { - s.failAnalysis( - s"ORDER BY position $index is not in select list " + - s"(valid range is [1, ${child.output.size}])") + throw QueryCompilationErrors.orderByPositionRangeError(index, child.output.size, s) } case o => o } @@ -1880,9 +1870,7 @@ class Analyzer(override val catalogManager: CatalogManager) case u @ UnresolvedOrdinal(index) if index > 0 && index <= aggs.size => aggs(index - 1) case ordinal @ UnresolvedOrdinal(index) => - ordinal.failAnalysis( - s"GROUP BY position $index is not in select list " + - s"(valid range is [1, ${aggs.size}])") + throw QueryCompilationErrors.groupByPositionRangeError(index, aggs.size, ordinal) case o => o } Aggregate(newGroups, aggs, child) @@ -2089,9 +2077,8 @@ class Analyzer(override val catalogManager: CatalogManager) withPosition(u) { v1SessionCatalog.lookupFunction(name, children) match { case generator: Generator => generator - case other => - failAnalysis(s"$name is expected to be a generator. However, " + - s"its class is ${other.getClass.getCanonicalName}, which is not a generator.") + case other => throw QueryCompilationErrors.generatorNotExpectedError( + name, other.getClass.getCanonicalName) } } case u @ UnresolvedFunction(funcId, arguments, isDistinct, filter) => @@ -2102,22 +2089,21 @@ class Analyzer(override val catalogManager: CatalogManager) // AggregateExpression. case wf: AggregateWindowFunction => if (isDistinct || filter.isDefined) { - failAnalysis("DISTINCT or FILTER specified, " + - s"but ${wf.prettyName} is not an aggregate function") + throw QueryCompilationErrors.distinctOrFilterOnlyWithAggregateFunctionError( + wf.prettyName) } else { wf } // We get an aggregate function, we need to wrap it in an AggregateExpression. case agg: AggregateFunction => if (filter.isDefined && !filter.get.deterministic) { - failAnalysis("FILTER expression is non-deterministic, " + - "it cannot be used in aggregate functions") + throw QueryCompilationErrors.nonDeterministicFilterInAggregateError } AggregateExpression(agg, Complete, isDistinct, filter) // This function is not an aggregate function, just return the resolved one. case other if (isDistinct || filter.isDefined) => - failAnalysis("DISTINCT or FILTER specified, " + - s"but ${other.prettyName} is not an aggregate function") + throw QueryCompilationErrors.distinctOrFilterOnlyWithAggregateFunctionError( + other.prettyName) case e: String2TrimExpression if arguments.size == 2 => if (trimWarningEnabled.get) { log.warn("Two-parameter TRIM/LTRIM/RTRIM function signatures are deprecated." + @@ -2256,9 +2242,8 @@ class Analyzer(override val catalogManager: CatalogManager) // Checks if the number of the aliases equals to the number of output columns // in the subquery. if (columnNames.size != outputAttrs.size) { - u.failAnalysis("Number of column aliases does not match number of columns. " + - s"Number of column aliases: ${columnNames.size}; " + - s"number of columns: ${outputAttrs.size}.") + throw QueryCompilationErrors.aliasNumberNotMatchColumnNumberError( + columnNames.size, outputAttrs.size, u) } val aliases = outputAttrs.zip(columnNames).map { case (attr, aliasName) => Alias(attr, aliasName)() @@ -2649,10 +2634,8 @@ class Analyzer(override val catalogManager: CatalogManager) } else if (names.isEmpty) { elementAttrs } else { - failAnalysis( - "The number of aliases supplied in the AS clause does not match the number of columns " + - s"output by the UDTF expected ${elementAttrs.size} aliases but got " + - s"${names.mkString(",")} ") + throw QueryCompilationErrors.aliasesNumberNotMatchUDTFOutputError( + elementAttrs.size, names.mkString(",")) } } } @@ -2761,8 +2744,7 @@ class Analyzer(override val catalogManager: CatalogManager) wsc.copy(partitionSpec = newPartitionSpec, orderSpec = newOrderSpec) case WindowExpression(ae: AggregateExpression, _) if ae.filter.isDefined => - failAnalysis( - "window aggregate function with filter predicate is not supported yet.") + throw QueryCompilationErrors.windowAggregateFunctionWithFilterNotSupportedError // Extract Windowed AggregateExpression case we @ WindowExpression( @@ -2775,8 +2757,7 @@ class Analyzer(override val catalogManager: CatalogManager) WindowExpression(newAgg, spec) case AggregateExpression(aggFunc, _, _, _, _) if hasWindowFunction(aggFunc.children) => - failAnalysis("It is not allowed to use a window function inside an aggregate " + - "function. Please use the inner window function in a sub-query.") + throw QueryCompilationErrors.windowFunctionInsideAggregateFunctionNotAllowedError // Extracts AggregateExpression. For example, for SUM(x) - Sum(y) OVER (...), // we need to extract SUM(x). @@ -2840,12 +2821,12 @@ class Analyzer(override val catalogManager: CatalogManager) // We do a final check and see if we only have a single Window Spec defined in an // expressions. if (distinctWindowSpec.isEmpty) { - failAnalysis(s"$expr does not have any WindowExpression.") + throw QueryCompilationErrors.expressionWithoutWindowExpressionError(expr) } else if (distinctWindowSpec.length > 1) { // newExpressionsWithWindowFunctions only have expressions with a single // WindowExpression. If we reach here, we have a bug. - failAnalysis(s"$expr has multiple Window Specifications ($distinctWindowSpec)." + - s"Please file a bug report with this error message, stack trace, and the query.") + throw QueryCompilationErrors.expressionWithMultiWindowExpressionsError( + expr, distinctWindowSpec) } else { val spec = distinctWindowSpec.head val specKey = (spec.partitionSpec, spec.orderSpec, WindowFunctionType.functionType(expr)) @@ -2873,10 +2854,10 @@ class Analyzer(override val catalogManager: CatalogManager) def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { case Filter(condition, _) if hasWindowFunction(condition) => - failAnalysis("It is not allowed to use window functions inside WHERE clause") + throw QueryCompilationErrors.windowFunctionNotAllowedError("WHERE") case UnresolvedHaving(condition, _) if hasWindowFunction(condition) => - failAnalysis("It is not allowed to use window functions inside HAVING clause") + throw QueryCompilationErrors.windowFunctionNotAllowedError("HAVING") // Aggregate with Having clause. This rule works with an unresolved Aggregate because // a resolved Aggregate will not have Window Functions. @@ -3076,10 +3057,10 @@ class Analyzer(override val catalogManager: CatalogManager) def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { case WindowExpression(wf: FrameLessOffsetWindowFunction, WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) if wf.frame != f => - failAnalysis(s"Cannot specify window frame for ${wf.prettyName} function") + throw QueryCompilationErrors.cannotSpecifyWindowFrameError(wf.prettyName) case WindowExpression(wf: WindowFunction, WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) if wf.frame != UnspecifiedFrame && wf.frame != f => - failAnalysis(s"Window Frame $f must match the required frame ${wf.frame}") + throw QueryCompilationErrors.windowFrameNotMatchRequiredFrameError(f, wf.frame) case WindowExpression(wf: WindowFunction, s @ WindowSpecDefinition(_, _, UnspecifiedFrame)) if wf.frame != UnspecifiedFrame => WindowExpression(wf, s.copy(frameSpecification = wf.frame)) @@ -3100,9 +3081,7 @@ class Analyzer(override val catalogManager: CatalogManager) object ResolveWindowOrder extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { case WindowExpression(wf: WindowFunction, spec) if spec.orderSpec.isEmpty => - failAnalysis(s"Window function $wf requires window to be ordered, please add ORDER BY " + - s"clause. For example SELECT $wf(value_expr) OVER (PARTITION BY window_partition " + - s"ORDER BY window_ordering) from table") + throw QueryCompilationErrors.windowFunctionWithWindowFrameNotOrderedError(wf) case WindowExpression(rank: RankLike, spec) if spec.resolved => val order = spec.orderSpec.map(_.child) WindowExpression(rank.withOrder(order), spec) @@ -3169,7 +3148,8 @@ class Analyzer(override val catalogManager: CatalogManager) i.userSpecifiedCols.map { col => i.table.resolve(Seq(col), resolver) - .getOrElse(i.table.failAnalysis(s"Cannot resolve column name $col")) + .getOrElse(throw QueryCompilationErrors.cannotResolveUserSpecifiedColumnsError( + col, i.table)) } } @@ -3178,9 +3158,8 @@ class Analyzer(override val catalogManager: CatalogManager) cols: Seq[NamedExpression], query: LogicalPlan): LogicalPlan = { if (cols.size != query.output.size) { - query.failAnalysis( - s"Cannot write to table due to mismatched user specified column size(${cols.size}) and" + - s" data column size(${query.output.size})") + throw QueryCompilationErrors.writeTableWithMismatchedColumnsError( + cols.size, query.output.size, query) } val nameToQueryExpr = cols.zip(query.output).toMap // Static partition columns in the table output should not appear in the column list @@ -3760,8 +3739,7 @@ object TimeWindowing extends Rule[LogicalPlan] { renamedPlan.withNewChildren(substitutedPlan :: Nil) } } else if (numWindowExpr > 1) { - p.failAnalysis("Multiple time window expressions would result in a cartesian product " + - "of rows, therefore they are currently not supported.") + throw QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p) } else { p // Return unchanged. Analyzer will throw exception later }