[SPARK-35899][SQL] Utility to convert connector expressions to Catalyst

### What changes were proposed in this pull request?

This PR adds a utility to convert public connector expressions to Catalyst expressions.

Notable differences:
- Switched to `QueryCompilationErrors` from an explicit `AnalysisException`.
- Decoupled the resolving logic for v2 references into separate methods to use in other places.

### Why are the changes needed?

These changes are needed as more and more places require this logic and it is better to implement it in a single place.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #33096 from aokolnychyi/spark-35899.

Authored-by: Anton Okolnychyi <aokolnychyi@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Anton Okolnychyi 2021-06-25 18:04:07 -07:00 committed by Dongjoon Hyun
parent 67eddf2ffc
commit 63cd1314d2
2 changed files with 82 additions and 41 deletions

View file

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, FieldReference, IdentityTransform, NamedReference, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortValue}
import org.apache.spark.sql.errors.QueryCompilationErrors
/**
* A utility class that converts public connector expressions into Catalyst expressions.
*/
private[sql] object V2ExpressionUtils extends SQLConfHelper {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
def resolveRef[T <: NamedExpression](
ref: NamedReference,
plan: LogicalPlan,
resolver: Resolver = conf.resolver): T = {
plan.resolve(ref.fieldNames, resolver) match {
case Some(namedExpr) =>
namedExpr.asInstanceOf[T]
case None =>
val name = ref.fieldNames.toSeq.quoted
val outputString = plan.output.map(_.name).mkString(",")
throw QueryCompilationErrors.cannotResolveAttributeError(name, outputString)
}
}
def resolveRefs[T <: NamedExpression](
refs: Seq[NamedReference],
plan: LogicalPlan,
resolver: Resolver = conf.resolver): Seq[T] = {
refs.map(ref => resolveRef[T](ref, plan, resolver))
}
def toCatalyst(
expr: V2Expression,
query: LogicalPlan,
resolver: Resolver = conf.resolver): Expression = {
expr match {
case SortValue(child, direction, nullOrdering) =>
val catalystChild = toCatalyst(child, query, resolver)
SortOrder(catalystChild, toCatalyst(direction), toCatalyst(nullOrdering), Seq.empty)
case IdentityTransform(ref) =>
resolveRef[NamedExpression](ref, query, resolver)
case ref: FieldReference =>
resolveRef[NamedExpression](ref, query, resolver)
case _ =>
throw new AnalysisException(s"$expr is not currently supported")
}
}
private def toCatalyst(direction: V2SortDirection): SortDirection = direction match {
case V2SortDirection.ASCENDING => Ascending
case V2SortDirection.DESCENDING => Descending
}
private def toCatalyst(nullOrdering: V2NullOrdering): NullOrdering = nullOrdering match {
case V2NullOrdering.NULLS_FIRST => NullsFirst
case V2NullOrdering.NULLS_LAST => NullsLast
}
}

View file

@ -17,12 +17,10 @@
package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, Expression, NamedExpression, NullOrdering, NullsFirst, NullsLast, SortDirection, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder}
import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils.toCatalyst
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RepartitionByExpression, Sort}
import org.apache.spark.sql.connector.distributions.{ClusteredDistribution, OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, FieldReference, IdentityTransform, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortValue}
import org.apache.spark.sql.connector.write.{RequiresDistributionAndOrdering, Write}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
@ -71,41 +69,4 @@ object DistributionAndOrderingUtils {
case _ =>
query
}
private def toCatalyst(
expr: V2Expression,
query: LogicalPlan,
resolver: Resolver): Expression = {
// we cannot perform the resolution in the analyzer since we need to optimize expressions
// in nodes like OverwriteByExpression before constructing a logical write
def resolve(ref: FieldReference): NamedExpression = {
query.resolve(ref.parts, resolver) match {
case Some(attr) => attr
case None => throw new AnalysisException(s"Cannot resolve '$ref' using ${query.output}")
}
}
expr match {
case SortValue(child, direction, nullOrdering) =>
val catalystChild = toCatalyst(child, query, resolver)
SortOrder(catalystChild, toCatalyst(direction), toCatalyst(nullOrdering), Seq.empty)
case IdentityTransform(ref) =>
resolve(ref)
case ref: FieldReference =>
resolve(ref)
case _ =>
throw new AnalysisException(s"$expr is not currently supported")
}
}
private def toCatalyst(direction: V2SortDirection): SortDirection = direction match {
case V2SortDirection.ASCENDING => Ascending
case V2SortDirection.DESCENDING => Descending
}
private def toCatalyst(nullOrdering: V2NullOrdering): NullOrdering = nullOrdering match {
case V2NullOrdering.NULLS_FIRST => NullsFirst
case V2NullOrdering.NULLS_LAST => NullsLast
}
}