[SPARK-35899][SQL][FOLLOWUP] Utility to convert connector expressions to Catalyst

### What changes were proposed in this pull request?

This PR addresses post-review comments on PR #33096:
- removes `private[sql]` modifier
- removes the option to pass a resolver to simplify the API

### Why are the changes needed?

These changes are needed to simply the utility API.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #33120 from aokolnychyi/spark-35899-follow-up.

Authored-by: Anton Okolnychyi <aokolnychyi@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Anton Okolnychyi 2021-06-28 22:22:07 -07:00 committed by Dongjoon Hyun
parent c45a6f5d09
commit 8a21d2dcfe
2 changed files with 12 additions and 24 deletions

View file

@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, FieldReference, IdentityTransform, NamedReference, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortValue} import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, FieldReference, IdentityTransform, NamedReference, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortValue}
import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.errors.QueryCompilationErrors
@ -27,14 +26,11 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
/** /**
* A utility class that converts public connector expressions into Catalyst expressions. * A utility class that converts public connector expressions into Catalyst expressions.
*/ */
private[sql] object V2ExpressionUtils extends SQLConfHelper { object V2ExpressionUtils extends SQLConfHelper {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
def resolveRef[T <: NamedExpression]( def resolveRef[T <: NamedExpression](ref: NamedReference, plan: LogicalPlan): T = {
ref: NamedReference, plan.resolve(ref.fieldNames, conf.resolver) match {
plan: LogicalPlan,
resolver: Resolver = conf.resolver): T = {
plan.resolve(ref.fieldNames, resolver) match {
case Some(namedExpr) => case Some(namedExpr) =>
namedExpr.asInstanceOf[T] namedExpr.asInstanceOf[T]
case None => case None =>
@ -44,25 +40,19 @@ private[sql] object V2ExpressionUtils extends SQLConfHelper {
} }
} }
def resolveRefs[T <: NamedExpression]( def resolveRefs[T <: NamedExpression](refs: Seq[NamedReference], plan: LogicalPlan): Seq[T] = {
refs: Seq[NamedReference], refs.map(ref => resolveRef[T](ref, plan))
plan: LogicalPlan,
resolver: Resolver = conf.resolver): Seq[T] = {
refs.map(ref => resolveRef[T](ref, plan, resolver))
} }
def toCatalyst( def toCatalyst(expr: V2Expression, query: LogicalPlan): Expression = {
expr: V2Expression,
query: LogicalPlan,
resolver: Resolver = conf.resolver): Expression = {
expr match { expr match {
case SortValue(child, direction, nullOrdering) => case SortValue(child, direction, nullOrdering) =>
val catalystChild = toCatalyst(child, query, resolver) val catalystChild = toCatalyst(child, query)
SortOrder(catalystChild, toCatalyst(direction), toCatalyst(nullOrdering), Seq.empty) SortOrder(catalystChild, toCatalyst(direction), toCatalyst(nullOrdering), Seq.empty)
case IdentityTransform(ref) => case IdentityTransform(ref) =>
resolveRef[NamedExpression](ref, query, resolver) resolveRef[NamedExpression](ref, query)
case ref: FieldReference => case ref: FieldReference =>
resolveRef[NamedExpression](ref, query, resolver) resolveRef[NamedExpression](ref, query)
case _ => case _ =>
throw new AnalysisException(s"$expr is not currently supported") throw new AnalysisException(s"$expr is not currently supported")
} }

View file

@ -29,12 +29,10 @@ object DistributionAndOrderingUtils {
def prepareQuery(write: Write, query: LogicalPlan, conf: SQLConf): LogicalPlan = write match { def prepareQuery(write: Write, query: LogicalPlan, conf: SQLConf): LogicalPlan = write match {
case write: RequiresDistributionAndOrdering => case write: RequiresDistributionAndOrdering =>
val resolver = conf.resolver
val numPartitions = write.requiredNumPartitions() val numPartitions = write.requiredNumPartitions()
val distribution = write.requiredDistribution match { val distribution = write.requiredDistribution match {
case d: OrderedDistribution => d.ordering.map(e => toCatalyst(e, query, resolver)) case d: OrderedDistribution => d.ordering.map(e => toCatalyst(e, query))
case d: ClusteredDistribution => d.clustering.map(e => toCatalyst(e, query, resolver)) case d: ClusteredDistribution => d.clustering.map(e => toCatalyst(e, query))
case _: UnspecifiedDistribution => Array.empty[Expression] case _: UnspecifiedDistribution => Array.empty[Expression]
} }
@ -55,7 +53,7 @@ object DistributionAndOrderingUtils {
} }
val ordering = write.requiredOrdering.toSeq val ordering = write.requiredOrdering.toSeq
.map(e => toCatalyst(e, query, resolver)) .map(e => toCatalyst(e, query))
.asInstanceOf[Seq[SortOrder]] .asInstanceOf[Seq[SortOrder]]
val queryWithDistributionAndOrdering = if (ordering.nonEmpty) { val queryWithDistributionAndOrdering = if (ordering.nonEmpty) {