[SPARK-31015][SQL] Star(*) expression fails when used with qualified column names for v2 tables

### What changes were proposed in this pull request?

For a v2 table created with `CREATE TABLE testcat.ns1.ns2.tbl (id bigint, name string) USING foo`, the following works as expected
```
SELECT testcat.ns1.ns2.tbl.id FROM testcat.ns1.ns2.tbl
```
, but a query with qualified column name with star(*)
```
SELECT testcat.ns1.ns2.tbl.* FROM testcat.ns1.ns2.tbl
[info]   org.apache.spark.sql.AnalysisException: cannot resolve 'testcat.ns1.ns2.tbl.*' given input columns 'id, name';
```
fails to resolve. And this PR proposes to fix this issue.

### Why are the changes needed?

To fix a bug as describe above.

### Does this PR introduce any user-facing change?

Yes, now `SELECT testcat.ns1.ns2.tbl.* FROM testcat.ns1.ns2.tbl` works as expected.

### How was this patch tested?

Added new test.

Closes #27766 from imback82/fix_star_expression.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Terry Kim 2020-03-04 00:55:26 +08:00 committed by Wenchen Fan
parent 3ff2135686
commit c263c15408
2 changed files with 40 additions and 22 deletions

View file

@ -298,35 +298,26 @@ abstract class Star extends LeafExpression with NamedExpression {
case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevaluable {
/**
* Returns true if the nameParts match the qualifier of the attribute
* Returns true if the nameParts is a subset of the last elements of qualifier of the attribute.
*
* There are two checks: i) Check if the nameParts match the qualifier fully.
* E.g. SELECT db.t1.* FROM db1.t1 In this case, the nameParts is Seq("db1", "t1") and
* qualifier of the attribute is Seq("db1","t1")
* ii) If (i) is not true, then check if nameParts is only a single element and it
* matches the table portion of the qualifier
*
* E.g. SELECT t1.* FROM db1.t1 In this case nameParts is Seq("t1") and
* qualifier is Seq("db1","t1")
* SELECT a.* FROM db1.t1 AS a
* In this case nameParts is Seq("a") and qualifier for
* attribute is Seq("a")
* For example, the following should all return true:
* - `SELECT ns1.ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns1", "ns2", "t") and
* qualifier is Seq("ns1", "ns2", "t").
* - `SELECT ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns2", "t") and
* qualifier is Seq("ns1", "ns2", "t").
* - `SELECT t.* FROM ns1.n2.t` where nameParts is Seq("t") and
* qualifier is Seq("ns1", "ns2", "t").
*/
private def matchedQualifier(
attribute: Attribute,
nameParts: Seq[String],
resolver: Resolver): Boolean = {
val qualifierList = attribute.qualifier
val matched = nameParts.corresponds(qualifierList)(resolver) || {
// check if it matches the table portion of the qualifier
if (nameParts.length == 1 && qualifierList.nonEmpty) {
resolver(nameParts.head, qualifierList.last)
} else {
false
}
val qualifierList = if (nameParts.length == attribute.qualifier.length) {
attribute.qualifier
} else {
attribute.qualifier.takeRight(nameParts.length)
}
matched
nameParts.corresponds(qualifierList)(resolver)
}
override def expand(

View file

@ -2342,6 +2342,33 @@ class DataSourceV2SQLSuite
assert(e2.message.contains("It is not allowed to add database prefix"))
}
test("SPARK-31015: star expression should work for qualified column names for v2 tables") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, name string) USING foo")
sql(s"INSERT INTO $t VALUES (1, 'hello')")
def check(tbl: String): Unit = {
checkAnswer(sql(s"SELECT testcat.ns1.ns2.tbl.* FROM $tbl"), Row(1, "hello"))
checkAnswer(sql(s"SELECT ns1.ns2.tbl.* FROM $tbl"), Row(1, "hello"))
checkAnswer(sql(s"SELECT ns2.tbl.* FROM $tbl"), Row(1, "hello"))
checkAnswer(sql(s"SELECT tbl.* FROM $tbl"), Row(1, "hello"))
}
// Test with qualified table name "testcat.ns1.ns2.tbl".
check(t)
// Test if current catalog and namespace is respected in column resolution.
sql("USE testcat.ns1.ns2")
check("tbl")
val ex = intercept[AnalysisException] {
sql(s"SELECT ns1.ns2.ns3.tbl.* from $t")
}
assert(ex.getMessage.contains("cannot resolve 'ns1.ns2.ns3.tbl.*"))
}
}
private def testV1Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")