From c263c154080e54dd07aaa584913773314c3528e5 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 4 Mar 2020 00:55:26 +0800 Subject: [PATCH] [SPARK-31015][SQL] Star(*) expression fails when used with qualified column names for v2 tables ### What changes were proposed in this pull request? For a v2 table created with `CREATE TABLE testcat.ns1.ns2.tbl (id bigint, name string) USING foo`, the following works as expected ``` SELECT testcat.ns1.ns2.tbl.id FROM testcat.ns1.ns2.tbl ``` , but a query with qualified column name with star(*) ``` SELECT testcat.ns1.ns2.tbl.* FROM testcat.ns1.ns2.tbl [info] org.apache.spark.sql.AnalysisException: cannot resolve 'testcat.ns1.ns2.tbl.*' given input columns 'id, name'; ``` fails to resolve. And this PR proposes to fix this issue. ### Why are the changes needed? To fix a bug as describe above. ### Does this PR introduce any user-facing change? Yes, now `SELECT testcat.ns1.ns2.tbl.* FROM testcat.ns1.ns2.tbl` works as expected. ### How was this patch tested? Added new test. Closes #27766 from imback82/fix_star_expression. Authored-by: Terry Kim Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/unresolved.scala | 35 +++++++------------ .../sql/connector/DataSourceV2SQLSuite.scala | 27 ++++++++++++++ 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 608f39c2d8..6048d98033 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -298,35 +298,26 @@ abstract class Star extends LeafExpression with NamedExpression { case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevaluable { /** - * Returns true if the nameParts match the qualifier of the attribute + * Returns true if the nameParts is a subset of the last elements of qualifier of the attribute. * - * There are two checks: i) Check if the nameParts match the qualifier fully. - * E.g. SELECT db.t1.* FROM db1.t1 In this case, the nameParts is Seq("db1", "t1") and - * qualifier of the attribute is Seq("db1","t1") - * ii) If (i) is not true, then check if nameParts is only a single element and it - * matches the table portion of the qualifier - * - * E.g. SELECT t1.* FROM db1.t1 In this case nameParts is Seq("t1") and - * qualifier is Seq("db1","t1") - * SELECT a.* FROM db1.t1 AS a - * In this case nameParts is Seq("a") and qualifier for - * attribute is Seq("a") + * For example, the following should all return true: + * - `SELECT ns1.ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns1", "ns2", "t") and + * qualifier is Seq("ns1", "ns2", "t"). + * - `SELECT ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns2", "t") and + * qualifier is Seq("ns1", "ns2", "t"). + * - `SELECT t.* FROM ns1.n2.t` where nameParts is Seq("t") and + * qualifier is Seq("ns1", "ns2", "t"). */ private def matchedQualifier( attribute: Attribute, nameParts: Seq[String], resolver: Resolver): Boolean = { - val qualifierList = attribute.qualifier - - val matched = nameParts.corresponds(qualifierList)(resolver) || { - // check if it matches the table portion of the qualifier - if (nameParts.length == 1 && qualifierList.nonEmpty) { - resolver(nameParts.head, qualifierList.last) - } else { - false - } + val qualifierList = if (nameParts.length == attribute.qualifier.length) { + attribute.qualifier + } else { + attribute.qualifier.takeRight(nameParts.length) } - matched + nameParts.corresponds(qualifierList)(resolver) } override def expand( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index c074b3352e..bccdce73f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2342,6 +2342,33 @@ class DataSourceV2SQLSuite assert(e2.message.contains("It is not allowed to add database prefix")) } + test("SPARK-31015: star expression should work for qualified column names for v2 tables") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, name string) USING foo") + sql(s"INSERT INTO $t VALUES (1, 'hello')") + + def check(tbl: String): Unit = { + checkAnswer(sql(s"SELECT testcat.ns1.ns2.tbl.* FROM $tbl"), Row(1, "hello")) + checkAnswer(sql(s"SELECT ns1.ns2.tbl.* FROM $tbl"), Row(1, "hello")) + checkAnswer(sql(s"SELECT ns2.tbl.* FROM $tbl"), Row(1, "hello")) + checkAnswer(sql(s"SELECT tbl.* FROM $tbl"), Row(1, "hello")) + } + + // Test with qualified table name "testcat.ns1.ns2.tbl". + check(t) + + // Test if current catalog and namespace is respected in column resolution. + sql("USE testcat.ns1.ns2") + check("tbl") + + val ex = intercept[AnalysisException] { + sql(s"SELECT ns1.ns2.ns3.tbl.* from $t") + } + assert(ex.getMessage.contains("cannot resolve 'ns1.ns2.ns3.tbl.*")) + } + } + private def testV1Command(sqlCommand: String, sqlParams: String): Unit = { val e = intercept[AnalysisException] { sql(s"$sqlCommand $sqlParams")