From 63cd1314d225d431b7b5ef9417e3af0c39a69a63 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Fri, 25 Jun 2021 18:04:07 -0700 Subject: [PATCH] [SPARK-35899][SQL] Utility to convert connector expressions to Catalyst ### What changes were proposed in this pull request? This PR adds a utility to convert public connector expressions to Catalyst expressions. Notable differences: - Switched to `QueryCompilationErrors` from an explicit `AnalysisException`. - Decoupled the resolving logic for v2 references into separate methods to use in other places. ### Why are the changes needed? These changes are needed as more and more places require this logic and it is better to implement it in a single place. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #33096 from aokolnychyi/spark-35899. Authored-by: Anton Okolnychyi Signed-off-by: Dongjoon Hyun --- .../expressions/V2ExpressionUtils.scala | 80 +++++++++++++++++++ .../v2/DistributionAndOrderingUtils.scala | 43 +--------- 2 files changed, 82 insertions(+), 41 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala new file mode 100644 index 0000000000..b4a4fb1101 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, FieldReference, IdentityTransform, NamedReference, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortValue} +import org.apache.spark.sql.errors.QueryCompilationErrors + +/** + * A utility class that converts public connector expressions into Catalyst expressions. + */ +private[sql] object V2ExpressionUtils extends SQLConfHelper { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper + + def resolveRef[T <: NamedExpression]( + ref: NamedReference, + plan: LogicalPlan, + resolver: Resolver = conf.resolver): T = { + plan.resolve(ref.fieldNames, resolver) match { + case Some(namedExpr) => + namedExpr.asInstanceOf[T] + case None => + val name = ref.fieldNames.toSeq.quoted + val outputString = plan.output.map(_.name).mkString(",") + throw QueryCompilationErrors.cannotResolveAttributeError(name, outputString) + } + } + + def resolveRefs[T <: NamedExpression]( + refs: Seq[NamedReference], + plan: LogicalPlan, + resolver: Resolver = conf.resolver): Seq[T] = { + refs.map(ref => resolveRef[T](ref, plan, resolver)) + } + + def toCatalyst( + expr: V2Expression, + query: LogicalPlan, + resolver: Resolver = conf.resolver): Expression = { + expr match { + case SortValue(child, direction, nullOrdering) => + val catalystChild = toCatalyst(child, query, resolver) + SortOrder(catalystChild, toCatalyst(direction), toCatalyst(nullOrdering), Seq.empty) + case IdentityTransform(ref) => + resolveRef[NamedExpression](ref, query, resolver) + case ref: FieldReference => + resolveRef[NamedExpression](ref, query, resolver) + case _ => + throw new AnalysisException(s"$expr is not currently supported") + } + } + + private def toCatalyst(direction: V2SortDirection): SortDirection = direction match { + case V2SortDirection.ASCENDING => Ascending + case V2SortDirection.DESCENDING => Descending + } + + private def toCatalyst(nullOrdering: V2NullOrdering): NullOrdering = nullOrdering match { + case V2NullOrdering.NULLS_FIRST => NullsFirst + case V2NullOrdering.NULLS_LAST => NullsLast + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala index 2e6ce3967a..8e251b4ac5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala @@ -17,12 +17,10 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.Resolver -import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, Expression, NamedExpression, NullOrdering, NullsFirst, NullsLast, SortDirection, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder} +import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils.toCatalyst import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RepartitionByExpression, Sort} import org.apache.spark.sql.connector.distributions.{ClusteredDistribution, OrderedDistribution, UnspecifiedDistribution} -import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, FieldReference, IdentityTransform, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortValue} import org.apache.spark.sql.connector.write.{RequiresDistributionAndOrdering, Write} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf @@ -71,41 +69,4 @@ object DistributionAndOrderingUtils { case _ => query } - - private def toCatalyst( - expr: V2Expression, - query: LogicalPlan, - resolver: Resolver): Expression = { - - // we cannot perform the resolution in the analyzer since we need to optimize expressions - // in nodes like OverwriteByExpression before constructing a logical write - def resolve(ref: FieldReference): NamedExpression = { - query.resolve(ref.parts, resolver) match { - case Some(attr) => attr - case None => throw new AnalysisException(s"Cannot resolve '$ref' using ${query.output}") - } - } - - expr match { - case SortValue(child, direction, nullOrdering) => - val catalystChild = toCatalyst(child, query, resolver) - SortOrder(catalystChild, toCatalyst(direction), toCatalyst(nullOrdering), Seq.empty) - case IdentityTransform(ref) => - resolve(ref) - case ref: FieldReference => - resolve(ref) - case _ => - throw new AnalysisException(s"$expr is not currently supported") - } - } - - private def toCatalyst(direction: V2SortDirection): SortDirection = direction match { - case V2SortDirection.ASCENDING => Ascending - case V2SortDirection.DESCENDING => Descending - } - - private def toCatalyst(nullOrdering: V2NullOrdering): NullOrdering = nullOrdering match { - case V2NullOrdering.NULLS_FIRST => NullsFirst - case V2NullOrdering.NULLS_LAST => NullsLast - } }