[SPARK-36241][SQL] Support creating tables with null column

### What changes were proposed in this pull request?
Previously we blocked creating tables with the null column to follow the hive behavior in PR #28833
In this PR, I propose the restore the previous behavior to support the null column in a table.

### Why are the changes needed?
For a complex query, it's possible to generate a column with null type. If this happens to the input query of
CTAS, the query will fail due to Spark doesn't allow creating a table with null type. From the user's perspective,
it’s hard to figure out why the null type column is produced in the complicated query and how to fix it. So removing
this constraint is more friendly to users.

### Does this PR introduce _any_ user-facing change?
Yes, this reverts the previous behavior change in #28833, for example, below command will success after this PR
```sql
CREATE TABLE t (col_1 void, col_2 int)
```

### How was this patch tested?
newly added and existing test cases

Closes #33488 from linhongliu-db/SPARK-36241-support-void-column.

Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 8e7e14dc0d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Linhong Liu 2021-07-27 17:31:52 +08:00 committed by Wenchen Fan
parent dfa5c4dadc
commit 91b9de3d80
8 changed files with 28 additions and 147 deletions

View file

@ -33,7 +33,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case AlterTableAddColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
val changes = cols.map { col =>
TableChange.addColumn(
col.name.toArray,
@ -46,7 +45,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case AlterTableReplaceColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
case Some(table) =>
// REPLACE COLUMNS deletes all the existing columns and adds new columns specified.
@ -68,7 +66,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
CreateV2Table(
catalog.asTableCatalog,
tbl.asIdentifier,
@ -80,9 +77,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case c @ CreateTableAsSelectStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
if (c.asSelect.resolved) {
assertNoNullTypeInSchema(c.asSelect.schema)
}
CreateTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
@ -95,7 +89,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case c @ ReplaceTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
ReplaceTable(
catalog.asTableCatalog,
tbl.asIdentifier,
@ -107,9 +100,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case c @ ReplaceTableAsSelectStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
if (c.asSelect.resolved) {
assertNoNullTypeInSchema(c.asSelect.schema)
}
ReplaceTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,

View file

@ -1166,8 +1166,6 @@ case class AlterTableAlterColumn(
nullable: Option[Boolean],
comment: Option[String],
position: Option[FieldPosition]) extends AlterTableCommand {
import org.apache.spark.sql.connector.catalog.CatalogV2Util._
dataType.foreach(failNullType)
override def operation: String = "update"

View file

@ -25,9 +25,8 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, CreateTableAsSelectStatement, CreateTableStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo}
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, NullType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils
@ -377,22 +376,4 @@ private[sql] object CatalogV2Util {
.getOrElse(catalogManager.v2SessionCatalog)
.asTableCatalog
}
def failNullType(dt: DataType): Unit = {
def containsNullType(dt: DataType): Boolean = dt match {
case ArrayType(et, _) => containsNullType(et)
case MapType(kt, vt, _) => containsNullType(kt) || containsNullType(vt)
case StructType(fields) => fields.exists(f => containsNullType(f.dataType))
case _ => dt.isInstanceOf[NullType]
}
if (containsNullType(dt)) {
throw QueryCompilationErrors.cannotCreateTablesWithNullTypeError()
}
}
def assertNoNullTypeInSchema(schema: StructType): Unit = {
schema.foreach { f =>
failNullType(f.dataType)
}
}
}

View file

@ -1393,10 +1393,6 @@ private[spark] object QueryCompilationErrors {
new AnalysisException("multi-part identifier cannot be empty.")
}
def cannotCreateTablesWithNullTypeError(): Throwable = {
new AnalysisException(s"Cannot create tables with ${NullType.simpleString} type.")
}
def functionUnsupportedInV2CatalogError(): Throwable = {
new AnalysisException("function is only supported in v1 catalog")
}

View file

@ -48,7 +48,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case AlterTableAddColumnsStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
cols.foreach { c =>
@ -72,7 +71,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case AlterTableReplaceColumnsStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
case Some(_: V1Table) =>
throw QueryCompilationErrors.replaceColumnsOnlySupportedWithV2TableError
@ -196,7 +194,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
// session catalog and the table provider is not v2.
case c @ CreateTableStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
val (storageFormat, provider) = getStorageFormatAndProvider(
c.provider, c.options, c.location, c.serde, ctas = false)
if (!isV2Provider(provider)) {
@ -218,9 +215,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case c @ CreateTableAsSelectStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
if (c.asSelect.resolved) {
assertNoNullTypeInSchema(c.asSelect.schema)
}
val (storageFormat, provider) = getStorageFormatAndProvider(
c.provider, c.options, c.location, c.serde, ctas = true)
if (!isV2Provider(provider)) {
@ -251,7 +245,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
// session catalog and the table provider is not v2.
case c @ ReplaceTableStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw QueryCompilationErrors.replaceTableOnlySupportedWithV2TableError
@ -268,9 +261,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case c @ ReplaceTableAsSelectStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
if (c.asSelect.resolved) {
assertNoNullTypeInSchema(c.asSelect.schema)
}
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw QueryCompilationErrors.replaceTableAsSelectOnlySupportedWithV2TableError

View file

@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
import org.apache.spark.sql.connector.expressions.{FieldReference, RewritableTransform}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.DDLUtils
@ -277,8 +276,6 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
"in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)
assertNoNullTypeInSchema(schema)
val normalizedPartCols = normalizePartitionColumns(schema, table)
val normalizedBucketSpec = normalizeBucketSpec(schema, table)

View file

@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy}
@ -227,8 +226,6 @@ case class RelationConversions(
conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
// validation is required to be done here before relation conversion.
DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
// This is for CREATE TABLE .. STORED AS PARQUET/ORC AS SELECT null
assertNoNullTypeInSchema(query.schema)
OptimizedCreateHiveTableAsSelectCommand(
tableDesc, query, query.output.map(_.name), mode)
}

View file

@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connector.FakeV2Provider
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
@ -2394,114 +2393,47 @@ class HiveDDLSuite
}
}
test("SPARK-20680: do not support for null column datatype") {
withTable("t") {
withView("tabNullType") {
hiveClient.runSqlHive("CREATE TABLE t (t1 int)")
hiveClient.runSqlHive("INSERT INTO t VALUES (3)")
hiveClient.runSqlHive("CREATE VIEW tabNullType AS SELECT NULL AS col FROM t")
checkAnswer(spark.table("tabNullType"), Row(null))
// No exception shows
val desc = spark.sql("DESC tabNullType").collect().toSeq
assert(desc.contains(Row("col", NullType.simpleString, null)))
}
}
// Forbid CTAS with null type
test("SPARK-36241: support creating tables with null datatype") {
// CTAS with null type
withTable("t1", "t2", "t3") {
assertAnalysisError(
"CREATE TABLE t1 USING PARQUET AS SELECT null as null_col",
"Cannot create tables with null type")
"CREATE TABLE t1 USING PARQUET AS SELECT NULL AS null_col",
"Parquet data source does not support null data type")
assertAnalysisError(
"CREATE TABLE t2 AS SELECT null as null_col",
"Cannot create tables with null type")
"CREATE TABLE t2 STORED AS PARQUET AS SELECT null as null_col",
"Unknown field type: void")
assertAnalysisError(
"CREATE TABLE t3 STORED AS PARQUET AS SELECT null as null_col",
"Cannot create tables with null type")
sql("CREATE TABLE t3 AS SELECT NULL AS null_col")
checkAnswer(sql("SELECT * FROM t3"), Row(null))
}
// Forbid Replace table AS SELECT with null type
withTable("t") {
val v2Source = classOf[FakeV2Provider].getName
assertAnalysisError(
s"CREATE OR REPLACE TABLE t USING $v2Source AS SELECT null as null_col",
"Cannot create tables with null type")
}
// Forbid creating table with VOID type in Spark
// Create table with null type
withTable("t1", "t2", "t3", "t4") {
assertAnalysisError(
"CREATE TABLE t1 (v VOID) USING PARQUET",
"Cannot create tables with null type")
"Parquet data source does not support null data type")
assertAnalysisError(
"CREATE TABLE t2 (v VOID) USING hive",
"Cannot create tables with null type")
assertAnalysisError(
"CREATE TABLE t3 (v VOID)",
"Cannot create tables with null type")
assertAnalysisError(
"CREATE TABLE t4 (v VOID) STORED AS PARQUET",
"Cannot create tables with null type")
"CREATE TABLE t2 (v VOID) STORED AS PARQUET",
"Unknown field type: void")
sql("CREATE TABLE t3 (v VOID) USING hive")
checkAnswer(sql("SELECT * FROM t3"), Seq.empty)
sql("CREATE TABLE t4 (v VOID)")
checkAnswer(sql("SELECT * FROM t4"), Seq.empty)
}
// Forbid Replace table with VOID type
// Create table with null type using spark.catalog.createTable
withTable("t") {
val v2Source = classOf[FakeV2Provider].getName
assertAnalysisError(
s"CREATE OR REPLACE TABLE t (v VOID) USING $v2Source",
"Cannot create tables with null type")
}
// Make sure spark.catalog.createTable with null type will fail
val schema1 = new StructType().add("c", NullType)
assertHiveTableNullType(schema1)
assertDSTableNullType(schema1)
val schema2 = new StructType()
.add("c", StructType(Seq(StructField.apply("c1", NullType))))
assertHiveTableNullType(schema2)
assertDSTableNullType(schema2)
val schema3 = new StructType().add("c", ArrayType(NullType))
assertHiveTableNullType(schema3)
assertDSTableNullType(schema3)
val schema4 = new StructType()
.add("c", MapType(StringType, NullType))
assertHiveTableNullType(schema4)
assertDSTableNullType(schema4)
val schema5 = new StructType()
.add("c", MapType(NullType, StringType))
assertHiveTableNullType(schema5)
assertDSTableNullType(schema5)
}
private def assertHiveTableNullType(schema: StructType): Unit = {
withTable("t") {
val e = intercept[AnalysisException] {
spark.catalog.createTable(
tableName = "t",
source = "hive",
schema = schema,
options = Map("fileFormat" -> "parquet"))
}.getMessage
assert(e.contains("Cannot create tables with null type"))
}
}
private def assertDSTableNullType(schema: StructType): Unit = {
withTable("t") {
val e = intercept[AnalysisException] {
spark.catalog.createTable(
tableName = "t",
source = "json",
schema = schema,
options = Map.empty[String, String])
}.getMessage
assert(e.contains("Cannot create tables with null type"))
val schema = new StructType().add("c", NullType)
spark.catalog.createTable(
tableName = "t",
source = "json",
schema = schema,
options = Map.empty[String, String])
checkAnswer(sql("SELECT * FROM t"), Seq.empty)
}
}