diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala index b4a4fb1101..72d072ff1a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper -import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, FieldReference, IdentityTransform, NamedReference, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortValue} import org.apache.spark.sql.errors.QueryCompilationErrors @@ -27,14 +26,11 @@ import org.apache.spark.sql.errors.QueryCompilationErrors /** * A utility class that converts public connector expressions into Catalyst expressions. */ -private[sql] object V2ExpressionUtils extends SQLConfHelper { +object V2ExpressionUtils extends SQLConfHelper { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper - def resolveRef[T <: NamedExpression]( - ref: NamedReference, - plan: LogicalPlan, - resolver: Resolver = conf.resolver): T = { - plan.resolve(ref.fieldNames, resolver) match { + def resolveRef[T <: NamedExpression](ref: NamedReference, plan: LogicalPlan): T = { + plan.resolve(ref.fieldNames, conf.resolver) match { case Some(namedExpr) => namedExpr.asInstanceOf[T] case None => @@ -44,25 +40,19 @@ private[sql] object V2ExpressionUtils extends SQLConfHelper { } } - def resolveRefs[T <: NamedExpression]( - refs: Seq[NamedReference], - plan: LogicalPlan, - resolver: Resolver = conf.resolver): Seq[T] = { - refs.map(ref => resolveRef[T](ref, plan, resolver)) + def resolveRefs[T <: NamedExpression](refs: Seq[NamedReference], plan: LogicalPlan): Seq[T] = { + refs.map(ref => resolveRef[T](ref, plan)) } - def toCatalyst( - expr: V2Expression, - query: LogicalPlan, - resolver: Resolver = conf.resolver): Expression = { + def toCatalyst(expr: V2Expression, query: LogicalPlan): Expression = { expr match { case SortValue(child, direction, nullOrdering) => - val catalystChild = toCatalyst(child, query, resolver) + val catalystChild = toCatalyst(child, query) SortOrder(catalystChild, toCatalyst(direction), toCatalyst(nullOrdering), Seq.empty) case IdentityTransform(ref) => - resolveRef[NamedExpression](ref, query, resolver) + resolveRef[NamedExpression](ref, query) case ref: FieldReference => - resolveRef[NamedExpression](ref, query, resolver) + resolveRef[NamedExpression](ref, query) case _ => throw new AnalysisException(s"$expr is not currently supported") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala index 8e251b4ac5..0d9146d31c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala @@ -29,12 +29,10 @@ object DistributionAndOrderingUtils { def prepareQuery(write: Write, query: LogicalPlan, conf: SQLConf): LogicalPlan = write match { case write: RequiresDistributionAndOrdering => - val resolver = conf.resolver - val numPartitions = write.requiredNumPartitions() val distribution = write.requiredDistribution match { - case d: OrderedDistribution => d.ordering.map(e => toCatalyst(e, query, resolver)) - case d: ClusteredDistribution => d.clustering.map(e => toCatalyst(e, query, resolver)) + case d: OrderedDistribution => d.ordering.map(e => toCatalyst(e, query)) + case d: ClusteredDistribution => d.clustering.map(e => toCatalyst(e, query)) case _: UnspecifiedDistribution => Array.empty[Expression] } @@ -55,7 +53,7 @@ object DistributionAndOrderingUtils { } val ordering = write.requiredOrdering.toSeq - .map(e => toCatalyst(e, query, resolver)) + .map(e => toCatalyst(e, query)) .asInstanceOf[Seq[SortOrder]] val queryWithDistributionAndOrdering = if (ordering.nonEmpty) {