[SPARK-34555][SQL] Resolve metadata output from DataFrame

### What changes were proposed in this pull request?

Add metadataOutput as a fallback to resolution.
Builds off https://github.com/apache/spark/pull/31654.

### Why are the changes needed?

The metadata columns could not be resolved via `df.col("metadataColName")` from the DataFrame API.

### Does this PR introduce _any_ user-facing change?

Yes, the metadata columns can now be resolved as described above.

### How was this patch tested?

Scala unit test.

Closes #31668 from karenfeng/spark-34555.

Authored-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Karen Feng 2021-03-03 22:07:41 +08:00 committed by Wenchen Fan
parent 56edb8156f
commit b01dd12805
2 changed files with 25 additions and 1 deletions

View file

@ -95,6 +95,8 @@ abstract class LogicalPlan
private[this] lazy val outputAttributes = AttributeSeq(output)
private[this] lazy val outputMetadataAttributes = AttributeSeq(metadataOutput)
/**
* Optionally resolves the given strings to a [[NamedExpression]] using the input from all child
* nodes of this LogicalPlan. The attribute is expressed as
@ -115,6 +117,7 @@ abstract class LogicalPlan
nameParts: Seq[String],
resolver: Resolver): Option[NamedExpression] =
outputAttributes.resolve(nameParts, resolver)
.orElse(outputMetadataAttributes.resolve(nameParts, resolver))
/**
* Given an attribute name, split it to name parts by dot, but
@ -124,7 +127,7 @@ abstract class LogicalPlan
def resolveQuoted(
name: String,
resolver: Resolver): Option[NamedExpression] = {
outputAttributes.resolve(UnresolvedAttribute.parseAttributeName(name), resolver)
resolve(UnresolvedAttribute.parseAttributeName(name), resolver)
}
/**

View file

@ -2607,6 +2607,27 @@ class DataSourceV2SQLSuite
}
}
test("SPARK-34555: Resolve DataFrame metadata column") {
val tbl = s"${catalogAndNamespace}table"
withTable(tbl) {
sql(s"CREATE TABLE $tbl (id bigint, data string) USING $v2Format " +
"PARTITIONED BY (bucket(4, id), id)")
sql(s"INSERT INTO $tbl VALUES (1, 'a'), (2, 'b'), (3, 'c')")
val table = spark.table(tbl)
val dfQuery = table.select(
table.col("id"),
table.col("data"),
table.col("index"),
table.col("_partition")
)
checkAnswer(
dfQuery,
Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3"))
)
}
}
test("SPARK-34561: drop/add columns to a dataset of `DESCRIBE TABLE`") {
val tbl = s"${catalogAndNamespace}tbl"
withTable(tbl) {