[SPARK-39383][SQL] Support DEFAULT columns in ALTER TABLE ADD COLUMNS to V2 data sources

### What changes were proposed in this pull request?

Extend DEFAULT column support in ALTER TABLE ADD COLUMNS commands to include V2 data sources.

Example:

```
> create or replace table t (a string default 'abc') using $v2Source
> insert into t values (default)
> alter table t add column (b string default 'def')
> insert into t values ("ghi")
> Select * from t
"abc", "def",
"ghi", "def"
```

### Why are the changes needed?

This makes V2 data sources easier to use and extend.

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

This PR includes new test coverage.

Closes #36771 from dtenedor/default-cols-v2-tables.

Authored-by: Daniel Tenedorio <daniel.tenedorio@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
master
Daniel Tenedorio 2022-06-21 10:08:39 -07:00 committed by Gengliang Wang
parent 678826e482
commit db0e972c09
8 changed files with 81 additions and 22 deletions

View File

@ -79,7 +79,7 @@ public interface TableChange {
* @return a TableChange for the addition
*/
static TableChange addColumn(String[] fieldNames, DataType dataType) {
return new AddColumn(fieldNames, dataType, true, null, null);
return new AddColumn(fieldNames, dataType, true, null, null, null);
}
/**
@ -95,7 +95,7 @@ public interface TableChange {
* @return a TableChange for the addition
*/
static TableChange addColumn(String[] fieldNames, DataType dataType, boolean isNullable) {
return new AddColumn(fieldNames, dataType, isNullable, null, null);
return new AddColumn(fieldNames, dataType, isNullable, null, null, null);
}
/**
@ -116,7 +116,7 @@ public interface TableChange {
DataType dataType,
boolean isNullable,
String comment) {
return new AddColumn(fieldNames, dataType, isNullable, comment, null);
return new AddColumn(fieldNames, dataType, isNullable, comment, null, null);
}
/**
@ -131,6 +131,7 @@ public interface TableChange {
* @param isNullable whether the new column can contain null
* @param comment the new field's comment string
* @param position the new columns's position
* @param defaultValue default value to return when scanning from the new column, if any
* @return a TableChange for the addition
*/
static TableChange addColumn(
@ -138,8 +139,9 @@ public interface TableChange {
DataType dataType,
boolean isNullable,
String comment,
ColumnPosition position) {
return new AddColumn(fieldNames, dataType, isNullable, comment, position);
ColumnPosition position,
String defaultValue) {
return new AddColumn(fieldNames, dataType, isNullable, comment, position, defaultValue);
}
/**
@ -378,18 +380,21 @@ public interface TableChange {
private final boolean isNullable;
private final String comment;
private final ColumnPosition position;
private final String defaultValue;
private AddColumn(
String[] fieldNames,
DataType dataType,
boolean isNullable,
String comment,
ColumnPosition position) {
ColumnPosition position,
String defaultValue) {
this.fieldNames = fieldNames;
this.dataType = dataType;
this.isNullable = isNullable;
this.comment = comment;
this.position = position;
this.defaultValue = defaultValue;
}
@Override
@ -415,6 +420,9 @@ public interface TableChange {
return position;
}
@Nullable
public String defaultValue() { return defaultValue; }
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -424,7 +432,8 @@ public interface TableChange {
Arrays.equals(fieldNames, addColumn.fieldNames) &&
dataType.equals(addColumn.dataType) &&
Objects.equals(comment, addColumn.comment) &&
Objects.equals(position, addColumn.position);
Objects.equals(position, addColumn.position) &&
Objects.equals(defaultValue, addColumn.defaultValue);
}
@Override

View File

@ -119,7 +119,8 @@ case class AddColumns(
col.dataType,
col.nullable,
col.comment.orNull,
col.position.map(_.position).orNull)
col.position.map(_.position).orNull,
col.default.orNull)
}
}
@ -154,7 +155,8 @@ case class ReplaceColumns(
col.dataType,
col.nullable,
col.comment.orNull,
null)
null,
col.default.orNull)
}
deleteChanges ++ addChanges
}

View File

@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@ -130,23 +131,34 @@ private[sql] object CatalogV2Util {
/**
* Apply schema changes to a schema and return the result.
*/
def applySchemaChanges(schema: StructType, changes: Seq[TableChange]): StructType = {
def applySchemaChanges(
schema: StructType,
changes: Seq[TableChange],
tableProvider: Option[String],
statementType: String): StructType = {
changes.foldLeft(schema) { (schema, change) =>
change match {
case add: AddColumn =>
add.fieldNames match {
case Array(name) =>
val field = StructField(name, add.dataType, nullable = add.isNullable)
val newField = Option(add.comment).map(field.withComment).getOrElse(field)
addField(schema, newField, add.position())
val fieldWithDefault: StructField =
Option(add.defaultValue).map(field.withCurrentDefaultValue).getOrElse(field)
val fieldWithComment: StructField =
Option(add.comment).map(fieldWithDefault.withComment).getOrElse(fieldWithDefault)
addField(schema, fieldWithComment, add.position(), tableProvider, statementType)
case names =>
replace(schema, names.init, parent => parent.dataType match {
case parentType: StructType =>
val field = StructField(names.last, add.dataType, nullable = add.isNullable)
val newField = Option(add.comment).map(field.withComment).getOrElse(field)
Some(parent.copy(dataType = addField(parentType, newField, add.position())))
val fieldWithDefault: StructField =
Option(add.defaultValue).map(field.withCurrentDefaultValue).getOrElse(field)
val fieldWithComment: StructField =
Option(add.comment).map(fieldWithDefault.withComment)
.getOrElse(fieldWithDefault)
Some(parent.copy(dataType =
addField(parentType, fieldWithComment, add.position(), tableProvider,
statementType)))
case _ =>
throw new IllegalArgumentException(s"Not a struct: ${names.init.last}")
})
@ -176,7 +188,7 @@ private[sql] object CatalogV2Util {
throw new IllegalArgumentException("Field not found: " + name)
}
val withFieldRemoved = StructType(struct.fields.filter(_ != oldField))
addField(withFieldRemoved, oldField, update.position())
addField(withFieldRemoved, oldField, update.position(), tableProvider, statementType)
}
update.fieldNames() match {
@ -204,8 +216,10 @@ private[sql] object CatalogV2Util {
private def addField(
schema: StructType,
field: StructField,
position: ColumnPosition): StructType = {
if (position == null) {
position: ColumnPosition,
tableProvider: Option[String],
statementType: String): StructType = {
val newSchema: StructType = if (position == null) {
schema.add(field)
} else if (position.isInstanceOf[First]) {
StructType(field +: schema.fields)
@ -218,6 +232,7 @@ private[sql] object CatalogV2Util {
val (before, after) = schema.fields.splitAt(fieldIndex + 1)
StructType(before ++ (field +: after))
}
constantFoldCurrentDefaultsToExistDefaults(newSchema, tableProvider, statementType)
}
private def replace(

View File

@ -118,6 +118,17 @@ case class StructField(
}
}
/**
* Updates the StructField with a new existence default value.
*/
def withExistenceDefaultValue(value: String): StructField = {
val newMetadata = new MetadataBuilder()
.withMetadata(metadata)
.putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, value)
.build()
copy(metadata = newMetadata)
}
/**
* Return the existence default value of this StructField.
*/

View File

@ -119,7 +119,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
val table = loadTable(ident).asInstanceOf[InMemoryTable]
val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes)
val schema = CatalogV2Util.applySchemaChanges(table.schema, changes)
val schema = CatalogV2Util.applySchemaChanges(table.schema, changes, None, "ALTER TABLE")
// fail if the last column in the schema was dropped
if (schema.fields.isEmpty) {

View File

@ -149,7 +149,8 @@ class V2SessionCatalog(catalog: SessionCatalog)
}
val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes)
val schema = CatalogV2Util.applySchemaChanges(
catalogTable.schema, changes, catalogTable.provider, "ALTER TABLE")
val comment = properties.get(TableCatalog.PROP_COMMENT)
val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner)
val location = properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI)

View File

@ -23,6 +23,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
@ -310,6 +311,26 @@ trait AlterTableTests extends SharedSparkSession {
}
}
test("SPARK-39383 DEFAULT columns on V2 data sources with ALTER TABLE ADD COLUMN") {
withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$v2Format, ") {
val t = s"${catalogAndNamespace}table_name"
withTable("t") {
sql(s"create table $t (a string) using $v2Format")
sql(s"alter table $t add column (b int default 2 + 3)")
val tableName = fullTableName(t)
val table = getTableMetadata(tableName)
assert(table.name === tableName)
assert(table.schema === new StructType()
.add("a", StringType)
.add(StructField("b", IntegerType)
.withCurrentDefaultValue("2 + 3")
.withExistenceDefaultValue("5")))
}
}
}
test("AlterTable: add complex column") {
val t = s"${catalogAndNamespace}table_name"
withTable(t) {

View File

@ -110,7 +110,7 @@ class InMemoryTableSessionCatalog extends TestV2SessionCatalogBase[InMemoryTable
Option(tables.get(ident)) match {
case Some(table) =>
val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes)
val schema = CatalogV2Util.applySchemaChanges(table.schema, changes)
val schema = CatalogV2Util.applySchemaChanges(table.schema, changes, None, "ALTER TABLE")
// fail if the last column in the schema was dropped
if (schema.fields.isEmpty) {