[SPARK-36642][SQL] Add df.withMetadata: a syntax suger to update the metadata of a dataframe

### What changes were proposed in this pull request?

To make it easy to use/modify the semantic annotation, we want to have a shorter API to update the metadata in a dataframe. Currently we have `df.withColumn("col1", col("col1").alias("col1", metadata=metadata))` to update the metadata without changing the column name, and this is too verbose. We want to have a syntax suger API `df.withMetadata("col1", metadata=metadata)` to achieve the same functionality.

### Why are the changes needed?

A bit of background for the frequency of the update: We are working on inferring the semantic data types and use them in AutoML and store the semantic annotation in the metadata. So in many cases, we will suggest the user update the metadata to correct the wrong inference or add the annotation for weak inference.

### Does this PR introduce _any_ user-facing change?

Yes.
A syntax suger API `df.withMetadata("col1", metadata=metadata)` to achieve the same functionality as`df.withColumn("col1", col("col1").alias("col1", metadata=metadata))`.

### How was this patch tested?

A unit test in DataFrameSuite.scala.

Closes #33853 from liangz1/withMetadata.

Authored-by: Liang Zhang <liang.zhang@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
This commit is contained in:
Liang Zhang 2021-09-08 09:35:18 +08:00 committed by Weichen Xu
parent 71dbd03fbe
commit cb30683b65
2 changed files with 22 additions and 0 deletions

View file

@ -2491,6 +2491,16 @@ class Dataset[T] private[sql](
}
}
/**
* Returns a new Dataset by updating an existing column with metadata.
*
* @group untypedrel
* @since 3.3.0
*/
def withMetadata(columnName: String, metadata: Metadata): DataFrame = {
withColumn(columnName, col(columnName), metadata)
}
/**
* Returns a new Dataset with a column dropped. This is a no-op if schema doesn't contain
* column name.

View file

@ -702,6 +702,18 @@ class DataFrameSuite extends QueryTest
"The size of column names: 2 isn't equal to the size of metadata elements: 1"))
}
test("SPARK-36642: withMetadata: replace metadata of a column") {
val metadata = new MetadataBuilder().putLong("key", 1L).build()
val df1 = sparkContext.parallelize(Array(1, 2, 3)).toDF("x")
val df2 = df1.withMetadata("x", metadata)
assert(df2.schema(0).metadata === metadata)
val err = intercept[AnalysisException] {
df1.withMetadata("x1", metadata)
}
assert(err.getMessage.contains("Cannot resolve column name"))
}
test("replace column using withColumn") {
val df2 = sparkContext.parallelize(Array(1, 2, 3)).toDF("x")
val df3 = df2.withColumn("x", df2("x") + 1)