From 816f6dd13eb35908bab8f1524c7629a5c6d585c6 Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Wed, 14 Apr 2021 07:01:40 +0000 Subject: [PATCH] [SPARK-34527][SQL] Resolve duplicated common columns from USING/NATURAL JOIN ### What changes were proposed in this pull request? Adds the duplicated common columns as hidden columns to the Projection used to rewrite NATURAL/USING JOINs. ### Why are the changes needed? Allows users to resolve either side of the NATURAL/USING JOIN's common keys. Previously, the user could only resolve the following columns: | Join type | Left key columns | Right key columns | | --- | --- | --- | | Inner | Yes | No | | Left | Yes | No | | Right | No | Yes | | Outer | No | No | ### Does this PR introduce _any_ user-facing change? Yes. The user can now symmetrically resolve the common columns from a NATURAL/USING JOIN. ### How was this patch tested? SQL-side tests. The behavior matches PostgreSQL and MySQL. Closes #31666 from karenfeng/spark-34527. Authored-by: Karen Feng Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/Analyzer.scala | 100 ++++-- .../sql/catalyst/analysis/unresolved.scala | 15 +- .../plans/logical/AnalysisHelper.scala | 1 + .../plans/logical/basicLogicalOperators.scala | 13 +- .../spark/sql/catalyst/util/package.scala | 26 +- .../v2/DataSourceV2Implicits.scala | 10 +- .../sql-tests/inputs/natural-join.sql | 53 +++ .../resources/sql-tests/inputs/using-join.sql | 70 ++++ .../sql-tests/results/natural-join.sql.out | 249 ++++++++++++- .../sql-tests/results/using-join.sql.out | 338 ++++++++++++++++++ .../apache/spark/sql/DataFrameJoinSuite.scala | 22 ++ 11 files changed, 843 insertions(+), 54 deletions(-) create mode 100644 sql/core/src/test/resources/sql-tests/inputs/using-join.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/using-join.sql.out diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d32ec06ab8..7a113965a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -917,41 +917,30 @@ class Analyzer(override val catalogManager: CatalogManager) * Adds metadata columns to output for child relations when nodes are missing resolved attributes. * * References to metadata columns are resolved using columns from [[LogicalPlan.metadataOutput]], - * but the relation's output does not include the metadata columns until the relation is replaced - * using [[DataSourceV2Relation.withMetadataColumns()]]. Unless this rule adds metadata to the - * relation's output, the analyzer will detect that nothing produces the columns. + * but the relation's output does not include the metadata columns until the relation is replaced. + * Unless this rule adds metadata to the relation's output, the analyzer will detect that nothing + * produces the columns. * * This rule only adds metadata columns when a node is resolved but is missing input from its * children. This ensures that metadata columns are not added to the plan unless they are used. By * checking only resolved nodes, this ensures that * expansion is already done so that metadata - * columns are not accidentally selected by *. + * columns are not accidentally selected by *. This rule resolves operators downwards to avoid + * projecting away metadata columns prematurely. */ object AddMetadataColumns extends Rule[LogicalPlan] { - import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ - private def hasMetadataCol(plan: LogicalPlan): Boolean = { - plan.expressions.exists(_.find { - case a: Attribute => a.isMetadataCol - case _ => false - }.isDefined) - } + import org.apache.spark.sql.catalyst.util._ - private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match { - case r: DataSourceV2Relation => r.withMetadataColumns() - case _ => plan.withNewChildren(plan.children.map(addMetadataCol)) - } - - def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown { + // Add metadata output to all node types case node if node.children.nonEmpty && node.resolved && hasMetadataCol(node) => val inputAttrs = AttributeSet(node.children.flatMap(_.output)) - val metaCols = node.expressions.flatMap(_.collect { - case a: Attribute if a.isMetadataCol && !inputAttrs.contains(a) => a - }) + val metaCols = getMetadataAttributes(node).filterNot(inputAttrs.contains) if (metaCols.isEmpty) { node } else { val newNode = addMetadataCol(node) - // We should not change the output schema of the plan. We should project away the extr + // We should not change the output schema of the plan. We should project away the extra // metadata columns if necessary. if (newNode.sameOutput(node)) { newNode @@ -960,6 +949,38 @@ class Analyzer(override val catalogManager: CatalogManager) } } } + + private def getMetadataAttributes(plan: LogicalPlan): Seq[Attribute] = { + plan.expressions.flatMap(_.collect { + case a: Attribute if a.isMetadataCol => a + case a: Attribute + if plan.children.exists(c => c.metadataOutput.exists(_.exprId == a.exprId)) => + plan.children.collectFirst { + case c if c.metadataOutput.exists(_.exprId == a.exprId) => + c.metadataOutput.find(_.exprId == a.exprId).get + }.get + }) + } + + private def hasMetadataCol(plan: LogicalPlan): Boolean = { + plan.expressions.exists(_.find { + case a: Attribute => + // If an attribute is resolved before being labeled as metadata + // (i.e. from the originating Dataset), we check with expression ID + a.isMetadataCol || + plan.children.exists(c => c.metadataOutput.exists(_.exprId == a.exprId)) + case _ => false + }.isDefined) + } + + private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match { + case r: DataSourceV2Relation => r.withMetadataColumns() + case p: Project => + p.copy( + projectList = p.metadataOutput ++ p.projectList, + child = addMetadataCol(p.child)) + case _ => plan.withNewChildren(plan.children.map(addMetadataCol)) + } } /** @@ -1898,10 +1919,10 @@ class Analyzer(override val catalogManager: CatalogManager) } /** - * This method tries to resolve expressions and find missing attributes recursively. Specially, - * when the expressions used in `Sort` or `Filter` contain unresolved attributes or resolved - * attributes which are missed from child output. This method tries to find the missing - * attributes out and add into the projection. + * This method tries to resolve expressions and find missing attributes recursively. + * Specifically, when the expressions used in `Sort` or `Filter` contain unresolved attributes + * or resolved attributes which are missing from child output. This method tries to find the + * missing attributes and add them into the projection. */ private def resolveExprsAndAddMissingAttrs( exprs: Seq[Expression], plan: LogicalPlan): (Seq[Expression], LogicalPlan) = { @@ -3150,7 +3171,9 @@ class Analyzer(override val catalogManager: CatalogManager) joinType: JoinType, joinNames: Seq[String], condition: Option[Expression], - hint: JoinHint) = { + hint: JoinHint): LogicalPlan = { + import org.apache.spark.sql.catalyst.util._ + val leftKeys = joinNames.map { keyName => left.output.find(attr => resolver(attr.name, keyName)).getOrElse { throw QueryCompilationErrors.unresolvedUsingColForJoinError(keyName, left, "left") @@ -3170,26 +3193,33 @@ class Analyzer(override val catalogManager: CatalogManager) val rUniqueOutput = right.output.filterNot(att => rightKeys.contains(att)) // the output list looks like: join keys, columns from left, columns from right - val projectList = joinType match { + val (projectList, hiddenList) = joinType match { case LeftOuter => - leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true)) + (leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true)), rightKeys) case LeftExistence(_) => - leftKeys ++ lUniqueOutput + (leftKeys ++ lUniqueOutput, Seq.empty) case RightOuter => - rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput + (rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput, leftKeys) case FullOuter => // in full outer join, joinCols should be non-null if there is. val joinedCols = joinPairs.map { case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() } - joinedCols ++ + (joinedCols ++ lUniqueOutput.map(_.withNullability(true)) ++ - rUniqueOutput.map(_.withNullability(true)) + rUniqueOutput.map(_.withNullability(true)), + leftKeys ++ rightKeys) case _ : InnerLike => - leftKeys ++ lUniqueOutput ++ rUniqueOutput + (leftKeys ++ lUniqueOutput ++ rUniqueOutput, rightKeys) case _ => sys.error("Unsupported natural join type " + joinType) } - // use Project to trim unnecessary fields - Project(projectList, Join(left, right, joinType, newCondition, hint)) + // use Project to hide duplicated common keys + // propagate hidden columns from nested USING/NATURAL JOINs + val project = Project(projectList, Join(left, right, joinType, newCondition, hint)) + project.setTagValue( + Project.hiddenOutputTag, + hiddenList.map(_.markAsSupportsQualifiedStar()) ++ + project.child.metadataOutput.filter(_.supportsQualifiedStar)) + project } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 3b2f4ca79c..5001e2ea88 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode} -import org.apache.spark.sql.catalyst.util.quoteIfNeeded +import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.types.{DataType, Metadata, StructType} @@ -340,11 +340,11 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu * Returns true if the nameParts is a subset of the last elements of qualifier of the attribute. * * For example, the following should all return true: - * - `SELECT ns1.ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns1", "ns2", "t") and + * - `SELECT ns1.ns2.t.* FROM ns1.ns2.t` where nameParts is Seq("ns1", "ns2", "t") and * qualifier is Seq("ns1", "ns2", "t"). - * - `SELECT ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns2", "t") and + * - `SELECT ns2.t.* FROM ns1.ns2.t` where nameParts is Seq("ns2", "t") and * qualifier is Seq("ns1", "ns2", "t"). - * - `SELECT t.* FROM ns1.n2.t` where nameParts is Seq("t") and + * - `SELECT t.* FROM ns1.ns2.t` where nameParts is Seq("t") and * qualifier is Seq("ns1", "ns2", "t"). */ private def matchedQualifier( @@ -366,10 +366,13 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu override def expand( input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { - // If there is no table specified, use all input attributes. + // If there is no table specified, use all non-hidden input attributes. if (target.isEmpty) return input.output - val expandedAttributes = input.output.filter(matchedQualifier(_, target.get, resolver)) + // If there is a table specified, use hidden input attributes as well + val hiddenOutput = input.metadataOutput.filter(_.supportsQualifiedStar) + val expandedAttributes = (hiddenOutput ++ input.output).filter( + matchedQualifier(_, target.get, resolver)) if (expandedAttributes.nonEmpty) return expandedAttributes diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala index 5a888de83c..9e9bc69a50 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala @@ -145,6 +145,7 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan => self.markRuleAsIneffective(ruleId) self } else { + afterRule.copyTagsFrom(self) afterRule } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 6ebb1beee8..b31e930e49 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -25,17 +25,18 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition} +import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.catalyst.trees.TreePattern.{ INNER_LIKE_JOIN, JOIN, LEFT_SEMI_OR_ANTI_JOIN, NATURAL_LIKE_JOIN, OUTER_JOIN, TreePattern } -import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.random.RandomSampler /** - * When planning take() or collect() operations, this special node that is inserted at the top of + * When planning take() or collect() operations, this special node is inserted at the top of * the logical plan before invoking the query planner. * * Rules can pattern-match on this node in order to apply transformations that only take effect @@ -69,7 +70,6 @@ object Subquery { case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) - override def metadataOutput: Seq[Attribute] = Nil override def maxRows: Option[Long] = child.maxRows override lazy val resolved: Boolean = { @@ -86,10 +86,17 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) override lazy val validConstraints: ExpressionSet = getAllValidConstraints(projectList) + override def metadataOutput: Seq[Attribute] = + getTagValue(Project.hiddenOutputTag).getOrElse(Nil) + override protected def withNewChildInternal(newChild: LogicalPlan): Project = copy(child = newChild) } +object Project { + val hiddenOutputTag: TreeNodeTag[Seq[Attribute]] = TreeNodeTag[Seq[Attribute]]("hidden_output") +} + /** * Applies a [[Generator]] to a stream of input rows, combining the * output of each into a new stream of rows. This operation is similar to a `flatMap` in functional diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index 4f8fcd9324..33fe48d44d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{NumericType, StringType} +import org.apache.spark.sql.types.{MetadataBuilder, NumericType, StringType} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -201,4 +201,28 @@ package object util extends Logging { def truncatedString[T](seq: Seq[T], sep: String, maxFields: Int): String = { truncatedString(seq, "", sep, "", maxFields) } + + val METADATA_COL_ATTR_KEY = "__metadata_col" + + implicit class MetadataColumnHelper(attr: Attribute) { + /** + * If set, this metadata column is a candidate during qualified star expansions. + */ + val SUPPORTS_QUALIFIED_STAR = "__supports_qualified_star" + + def isMetadataCol: Boolean = attr.metadata.contains(METADATA_COL_ATTR_KEY) && + attr.metadata.getBoolean(METADATA_COL_ATTR_KEY) + + def supportsQualifiedStar: Boolean = attr.isMetadataCol && + attr.metadata.contains(SUPPORTS_QUALIFIED_STAR) && + attr.metadata.getBoolean(SUPPORTS_QUALIFIED_STAR) + + def markAsSupportsQualifiedStar(): Attribute = attr.withMetadata( + new MetadataBuilder() + .withMetadata(attr.metadata) + .putBoolean(METADATA_COL_ATTR_KEY, true) + .putBoolean(SUPPORTS_QUALIFIED_STAR, true) + .build() + ) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala index c0d24e2366..efd3ffebf5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala @@ -20,15 +20,14 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedPartitionSpec, UnresolvedPartitionSpec} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability, TruncatableTable} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap object DataSourceV2Implicits { - private val METADATA_COL_ATTR_KEY = "__metadata_col" - implicit class TableHelper(table: Table) { def asReadable: SupportsRead = { table match { @@ -101,11 +100,6 @@ object DataSourceV2Implicits { def toAttributes: Seq[AttributeReference] = asStruct.toAttributes } - implicit class MetadataColumnHelper(attr: Attribute) { - def isMetadataCol: Boolean = attr.metadata.contains(METADATA_COL_ATTR_KEY) && - attr.metadata.getBoolean(METADATA_COL_ATTR_KEY) - } - implicit class OptionsHelper(options: Map[String, String]) { def asOptions: CaseInsensitiveStringMap = { new CaseInsensitiveStringMap(options.asJava) diff --git a/sql/core/src/test/resources/sql-tests/inputs/natural-join.sql b/sql/core/src/test/resources/sql-tests/inputs/natural-join.sql index 71a50157b7..060f15e3d2 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/natural-join.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/natural-join.sql @@ -10,6 +10,19 @@ create temporary view nt2 as select * from values ("one", 5) as nt2(k, v2); +create temporary view nt3 as select * from values + ("one", 4), + ("two", 5), + ("one", 6) + as nt3(k, v3); + +create temporary view nt4 as select * from values + ("one", 7), + ("two", 8), + ("one", 9) + as nt4(k, v4); + +SELECT * FROM nt1 natural join nt2; SELECT * FROM nt1 natural join nt2 where k = "one"; @@ -18,3 +31,43 @@ SELECT * FROM nt1 natural left join nt2 order by v1, v2; SELECT * FROM nt1 natural right join nt2 order by v1, v2; SELECT count(*) FROM nt1 natural full outer join nt2; + +SELECT k FROM nt1 natural join nt2; + +SELECT k FROM nt1 natural join nt2 where k = "one"; + +SELECT nt1.* FROM nt1 natural join nt2; + +SELECT nt2.* FROM nt1 natural join nt2; + +SELECT sbq.* from (SELECT * FROM nt1 natural join nt2) sbq; + +SELECT sbq.k from (SELECT * FROM nt1 natural join nt2) sbq; + +SELECT nt1.*, nt2.* FROM nt1 natural join nt2; + +SELECT *, nt2.k FROM nt1 natural join nt2; + +SELECT nt1.k, nt2.k FROM nt1 natural join nt2; + +SELECT nt1.k, nt2.k FROM nt1 natural join nt2 where k = "one"; + +SELECT * FROM (SELECT * FROM nt1 natural join nt2); + +SELECT * FROM (SELECT nt1.*, nt2.* FROM nt1 natural join nt2); + +SELECT * FROM (SELECT nt1.v1, nt2.k FROM nt1 natural join nt2); + +SELECT nt2.k FROM (SELECT * FROM nt1 natural join nt2); + +SELECT * FROM nt1 natural join nt2 natural join nt3; + +SELECT nt1.*, nt2.*, nt3.* FROM nt1 natural join nt2 natural join nt3; + +SELECT nt1.*, nt2.*, nt3.* FROM nt1 natural join nt2 join nt3 on nt2.k = nt3.k; + +SELECT * FROM nt1 natural join nt2 join nt3 on nt1.k = nt3.k; + +SELECT * FROM nt1 natural join nt2 join nt3 on nt2.k = nt3.k; + +SELECT nt1.*, nt2.*, nt3.*, nt4.* FROM nt1 natural join nt2 natural join nt3 natural join nt4; diff --git a/sql/core/src/test/resources/sql-tests/inputs/using-join.sql b/sql/core/src/test/resources/sql-tests/inputs/using-join.sql new file mode 100644 index 0000000000..336d19f0f2 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/using-join.sql @@ -0,0 +1,70 @@ +create temporary view nt1 as select * from values + ("one", 1), + ("two", 2), + ("three", 3) + as nt1(k, v1); + +create temporary view nt2 as select * from values + ("one", 1), + ("two", 22), + ("one", 5), + ("four", 4) + as nt2(k, v2); + +SELECT * FROM nt1 left outer join nt2 using (k); + +SELECT k FROM nt1 left outer join nt2 using (k); + +SELECT nt1.*, nt2.* FROM nt1 left outer join nt2 using (k); + +SELECT nt1.k, nt2.k FROM nt1 left outer join nt2 using (k); + +SELECT k, nt1.k FROM nt1 left outer join nt2 using (k); + +SELECT k, nt2.k FROM nt1 left outer join nt2 using (k); + +SELECT * FROM nt1 left semi join nt2 using (k); + +SELECT k FROM nt1 left semi join nt2 using (k); + +SELECT nt1.* FROM nt1 left semi join nt2 using (k); + +SELECT nt1.k FROM nt1 left semi join nt2 using (k); + +SELECT k, nt1.k FROM nt1 left semi join nt2 using (k); + +SELECT * FROM nt1 right outer join nt2 using (k); + +SELECT k FROM nt1 right outer join nt2 using (k); + +SELECT nt1.*, nt2.* FROM nt1 right outer join nt2 using (k); + +SELECT nt1.k, nt2.k FROM nt1 right outer join nt2 using (k); + +SELECT k, nt1.k FROM nt1 right outer join nt2 using (k); + +SELECT k, nt2.k FROM nt1 right outer join nt2 using (k); + +SELECT * FROM nt1 full outer join nt2 using (k); + +SELECT k FROM nt1 full outer join nt2 using (k); + +SELECT nt1.*, nt2.* FROM nt1 full outer join nt2 using (k); + +SELECT nt1.k, nt2.k FROM nt1 full outer join nt2 using (k); + +SELECT k, nt1.k FROM nt1 full outer join nt2 using (k); + +SELECT k, nt2.k FROM nt1 full outer join nt2 using (k); + +SELECT * FROM nt1 full outer join nt2 using (k); + +SELECT k FROM nt1 inner join nt2 using (k); + +SELECT nt1.*, nt2.* FROM nt1 inner join nt2 using (k); + +SELECT nt1.k, nt2.k FROM nt1 inner join nt2 using (k); + +SELECT k, nt1.k FROM nt1 inner join nt2 using (k); + +SELECT k, nt2.k FROM nt1 inner join nt2 using (k); diff --git a/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out b/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out index 13f319700d..794e4725d9 100644 --- a/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 6 +-- Number of queries: 29 -- !query @@ -26,6 +26,40 @@ struct<> +-- !query +create temporary view nt3 as select * from values + ("one", 4), + ("two", 5), + ("one", 6) + as nt3(k, v3) +-- !query schema +struct<> +-- !query output + + + +-- !query +create temporary view nt4 as select * from values + ("one", 7), + ("two", 8), + ("one", 9) + as nt4(k, v4) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM nt1 natural join nt2 +-- !query schema +struct +-- !query output +one 1 1 +one 1 5 +two 2 22 + + -- !query SELECT * FROM nt1 natural join nt2 where k = "one" -- !query schema @@ -62,3 +96,216 @@ SELECT count(*) FROM nt1 natural full outer join nt2 struct -- !query output 4 + + +-- !query +SELECT k FROM nt1 natural join nt2 +-- !query schema +struct +-- !query output +one +one +two + + +-- !query +SELECT k FROM nt1 natural join nt2 where k = "one" +-- !query schema +struct +-- !query output +one +one + + +-- !query +SELECT nt1.* FROM nt1 natural join nt2 +-- !query schema +struct +-- !query output +one 1 +one 1 +two 2 + + +-- !query +SELECT nt2.* FROM nt1 natural join nt2 +-- !query schema +struct +-- !query output +one 1 +one 5 +two 22 + + +-- !query +SELECT sbq.* from (SELECT * FROM nt1 natural join nt2) sbq +-- !query schema +struct +-- !query output +one 1 1 +one 1 5 +two 2 22 + + +-- !query +SELECT sbq.k from (SELECT * FROM nt1 natural join nt2) sbq +-- !query schema +struct +-- !query output +one +one +two + + +-- !query +SELECT nt1.*, nt2.* FROM nt1 natural join nt2 +-- !query schema +struct +-- !query output +one 1 one 1 +one 1 one 5 +two 2 two 22 + + +-- !query +SELECT *, nt2.k FROM nt1 natural join nt2 +-- !query schema +struct +-- !query output +one 1 1 one +one 1 5 one +two 2 22 two + + +-- !query +SELECT nt1.k, nt2.k FROM nt1 natural join nt2 +-- !query schema +struct +-- !query output +one one +one one +two two + + +-- !query +SELECT nt1.k, nt2.k FROM nt1 natural join nt2 where k = "one" +-- !query schema +struct +-- !query output +one one +one one + + +-- !query +SELECT * FROM (SELECT * FROM nt1 natural join nt2) +-- !query schema +struct +-- !query output +one 1 1 +one 1 5 +two 2 22 + + +-- !query +SELECT * FROM (SELECT nt1.*, nt2.* FROM nt1 natural join nt2) +-- !query schema +struct +-- !query output +one 1 one 1 +one 1 one 5 +two 2 two 22 + + +-- !query +SELECT * FROM (SELECT nt1.v1, nt2.k FROM nt1 natural join nt2) +-- !query schema +struct +-- !query output +1 one +1 one +2 two + + +-- !query +SELECT nt2.k FROM (SELECT * FROM nt1 natural join nt2) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +cannot resolve 'nt2.k' given input columns: [__auto_generated_subquery_name.k, __auto_generated_subquery_name.v1, __auto_generated_subquery_name.v2]; line 1 pos 7 + + +-- !query +SELECT * FROM nt1 natural join nt2 natural join nt3 +-- !query schema +struct +-- !query output +one 1 1 4 +one 1 1 6 +one 1 5 4 +one 1 5 6 +two 2 22 5 + + +-- !query +SELECT nt1.*, nt2.*, nt3.* FROM nt1 natural join nt2 natural join nt3 +-- !query schema +struct +-- !query output +one 1 one 1 one 4 +one 1 one 1 one 6 +one 1 one 5 one 4 +one 1 one 5 one 6 +two 2 two 22 two 5 + + +-- !query +SELECT nt1.*, nt2.*, nt3.* FROM nt1 natural join nt2 join nt3 on nt2.k = nt3.k +-- !query schema +struct +-- !query output +one 1 one 1 one 4 +one 1 one 1 one 6 +one 1 one 5 one 4 +one 1 one 5 one 6 +two 2 two 22 two 5 + + +-- !query +SELECT * FROM nt1 natural join nt2 join nt3 on nt1.k = nt3.k +-- !query schema +struct +-- !query output +one 1 1 one 4 +one 1 1 one 6 +one 1 5 one 4 +one 1 5 one 6 +two 2 22 two 5 + + +-- !query +SELECT * FROM nt1 natural join nt2 join nt3 on nt2.k = nt3.k +-- !query schema +struct +-- !query output +one 1 1 one 4 +one 1 1 one 6 +one 1 5 one 4 +one 1 5 one 6 +two 2 22 two 5 + + +-- !query +SELECT nt1.*, nt2.*, nt3.*, nt4.* FROM nt1 natural join nt2 natural join nt3 natural join nt4 +-- !query schema +struct +-- !query output +one 1 one 1 one 4 one 7 +one 1 one 1 one 4 one 9 +one 1 one 1 one 6 one 7 +one 1 one 1 one 6 one 9 +one 1 one 5 one 4 one 7 +one 1 one 5 one 4 one 9 +one 1 one 5 one 6 one 7 +one 1 one 5 one 6 one 9 +two 2 two 22 two 5 two 8 diff --git a/sql/core/src/test/resources/sql-tests/results/using-join.sql.out b/sql/core/src/test/resources/sql-tests/results/using-join.sql.out new file mode 100644 index 0000000000..1d2ae9d96e --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/using-join.sql.out @@ -0,0 +1,338 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 31 + + +-- !query +create temporary view nt1 as select * from values + ("one", 1), + ("two", 2), + ("three", 3) + as nt1(k, v1) +-- !query schema +struct<> +-- !query output + + + +-- !query +create temporary view nt2 as select * from values + ("one", 1), + ("two", 22), + ("one", 5), + ("four", 4) + as nt2(k, v2) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM nt1 left outer join nt2 using (k) +-- !query schema +struct +-- !query output +one 1 1 +one 1 5 +three 3 NULL +two 2 22 + + +-- !query +SELECT k FROM nt1 left outer join nt2 using (k) +-- !query schema +struct +-- !query output +one +one +three +two + + +-- !query +SELECT nt1.*, nt2.* FROM nt1 left outer join nt2 using (k) +-- !query schema +struct +-- !query output +one 1 one 1 +one 1 one 5 +three 3 NULL NULL +two 2 two 22 + + +-- !query +SELECT nt1.k, nt2.k FROM nt1 left outer join nt2 using (k) +-- !query schema +struct +-- !query output +one one +one one +three NULL +two two + + +-- !query +SELECT k, nt1.k FROM nt1 left outer join nt2 using (k) +-- !query schema +struct +-- !query output +one one +one one +three three +two two + + +-- !query +SELECT k, nt2.k FROM nt1 left outer join nt2 using (k) +-- !query schema +struct +-- !query output +one one +one one +three NULL +two two + + +-- !query +SELECT * FROM nt1 left semi join nt2 using (k) +-- !query schema +struct +-- !query output +one 1 +two 2 + + +-- !query +SELECT k FROM nt1 left semi join nt2 using (k) +-- !query schema +struct +-- !query output +one +two + + +-- !query +SELECT nt1.* FROM nt1 left semi join nt2 using (k) +-- !query schema +struct +-- !query output +one 1 +two 2 + + +-- !query +SELECT nt1.k FROM nt1 left semi join nt2 using (k) +-- !query schema +struct +-- !query output +one +two + + +-- !query +SELECT k, nt1.k FROM nt1 left semi join nt2 using (k) +-- !query schema +struct +-- !query output +one one +two two + + +-- !query +SELECT * FROM nt1 right outer join nt2 using (k) +-- !query schema +struct +-- !query output +four NULL 4 +one 1 1 +one 1 5 +two 2 22 + + +-- !query +SELECT k FROM nt1 right outer join nt2 using (k) +-- !query schema +struct +-- !query output +four +one +one +two + + +-- !query +SELECT nt1.*, nt2.* FROM nt1 right outer join nt2 using (k) +-- !query schema +struct +-- !query output +NULL NULL four 4 +one 1 one 1 +one 1 one 5 +two 2 two 22 + + +-- !query +SELECT nt1.k, nt2.k FROM nt1 right outer join nt2 using (k) +-- !query schema +struct +-- !query output +NULL four +one one +one one +two two + + +-- !query +SELECT k, nt1.k FROM nt1 right outer join nt2 using (k) +-- !query schema +struct +-- !query output +four NULL +one one +one one +two two + + +-- !query +SELECT k, nt2.k FROM nt1 right outer join nt2 using (k) +-- !query schema +struct +-- !query output +four four +one one +one one +two two + + +-- !query +SELECT * FROM nt1 full outer join nt2 using (k) +-- !query schema +struct +-- !query output +four NULL 4 +one 1 1 +one 1 5 +three 3 NULL +two 2 22 + + +-- !query +SELECT k FROM nt1 full outer join nt2 using (k) +-- !query schema +struct +-- !query output +four +one +one +three +two + + +-- !query +SELECT nt1.*, nt2.* FROM nt1 full outer join nt2 using (k) +-- !query schema +struct +-- !query output +NULL NULL four 4 +one 1 one 1 +one 1 one 5 +three 3 NULL NULL +two 2 two 22 + + +-- !query +SELECT nt1.k, nt2.k FROM nt1 full outer join nt2 using (k) +-- !query schema +struct +-- !query output +NULL four +one one +one one +three NULL +two two + + +-- !query +SELECT k, nt1.k FROM nt1 full outer join nt2 using (k) +-- !query schema +struct +-- !query output +four NULL +one one +one one +three three +two two + + +-- !query +SELECT k, nt2.k FROM nt1 full outer join nt2 using (k) +-- !query schema +struct +-- !query output +four four +one one +one one +three NULL +two two + + +-- !query +SELECT * FROM nt1 full outer join nt2 using (k) +-- !query schema +struct +-- !query output +four NULL 4 +one 1 1 +one 1 5 +three 3 NULL +two 2 22 + + +-- !query +SELECT k FROM nt1 inner join nt2 using (k) +-- !query schema +struct +-- !query output +one +one +two + + +-- !query +SELECT nt1.*, nt2.* FROM nt1 inner join nt2 using (k) +-- !query schema +struct +-- !query output +one 1 one 1 +one 1 one 5 +two 2 two 22 + + +-- !query +SELECT nt1.k, nt2.k FROM nt1 inner join nt2 using (k) +-- !query schema +struct +-- !query output +one one +one one +two two + + +-- !query +SELECT k, nt1.k FROM nt1 inner join nt2 using (k) +-- !query schema +struct +-- !query output +one one +one one +two two + + +-- !query +SELECT k, nt2.k FROM nt1 inner join nt2 using (k) +-- !query schema +struct +-- !query output +one one +one one +two two diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index c2555a1991..a803fa88ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -477,4 +477,26 @@ class DataFrameJoinSuite extends QueryTest checkAnswer(df3.except(df4), Row(10, 50, 2, Row(10, 50, 2))) } + + test("SPARK-34527: Resolve common columns from USING JOIN") { + val joinDf = testData2.as("testData2").join( + testData3.as("testData3"), usingColumns = Seq("a"), joinType = "fullouter") + val dfQuery = joinDf.select( + $"a", $"testData2.a", $"testData2.b", $"testData3.a", $"testData3.b") + val dfQuery2 = joinDf.select( + $"a", testData2.col("a"), testData2.col("b"), testData3.col("a"), testData3.col("b")) + + Seq(dfQuery, dfQuery2).map { query => + checkAnswer(query, + Seq( + Row(1, 1, 1, 1, null), + Row(1, 1, 2, 1, null), + Row(2, 2, 1, 2, 2), + Row(2, 2, 2, 2, 2), + Row(3, 3, 1, null, null), + Row(3, 3, 2, null, null) + ) + ) + } + } }