[SPARK-31707][SQL] Revert SPARK-30098 Use default datasource as provider for CREATE TABLE syntax

### What changes were proposed in this pull request?

This patch effectively reverts SPARK-30098 via below changes:

* Removed the config
* Removed the changes done in parser rule
* Removed the usage of config in tests
  * Removed tests which depend on the config
  * Rolled back some tests to before SPARK-30098 which were affected by SPARK-30098
* Reflect the change into docs (migration doc, create table syntax)

### Why are the changes needed?

SPARK-30098 brought confusion and frustration on using create table DDL query, and we agreed about the bad effect on the change.

Please go through the [discussion thread](http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-Resolve-ambiguous-parser-rule-between-two-quot-create-table-quot-s-td29051i20.html) to see the details.

### Does this PR introduce _any_ user-facing change?

No, compared to Spark 2.4.x. End users tried to experiment with Spark 3.0.0 previews will see the change that the behavior is going back to Spark 2.4.x, but I believe we won't guarantee compatibility in preview releases.

### How was this patch tested?

Existing UTs.

Closes #28517 from HeartSaVioR/revert-SPARK-30098.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Jungtaek Lim (HeartSaVioR) 2020-05-17 02:27:23 +00:00 committed by Wenchen Fan
parent 5539ecfdac
commit d2bec5e265
13 changed files with 91 additions and 167 deletions

View file

@ -40,8 +40,6 @@ license: |
### DDL Statements
- In Spark 3.0, `CREATE TABLE` without a specific provider uses the value of `spark.sql.sources.default` as its provider. In Spark version 2.4 and below, it was Hive. To restore the behavior before Spark 3.0, you can set `spark.sql.legacy.createHiveTableByDefault.enabled` to `true`.
- In Spark 3.0, when inserting a value into a table column with a different data type, the type coercion is performed as per ANSI SQL standard. Certain unreasonable type conversions such as converting `string` to `int` and `double` to `boolean` are disallowed. A runtime exception is thrown if the value is out-of-range for the data type of the column. In Spark version 2.4 and below, type conversions during table insertion are allowed as long as they are valid `Cast`. When inserting an out-of-range value to an integral field, the low-order bits of the value is inserted(the same as Java/Scala numeric type casting). For example, if 257 is inserted to a field of byte type, the result is 1. The behavior is controlled by the option `spark.sql.storeAssignmentPolicy`, with a default value as "ANSI". Setting the option as "Legacy" restores the previous behavior.
- The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set.

View file

@ -28,7 +28,7 @@ The `CREATE TABLE` statement defines a new table using a Data Source.
```sql
CREATE TABLE [ IF NOT EXISTS ] table_identifier
[ ( col_name1 col_type1 [ COMMENT col_comment1 ], ... ) ]
[ USING data_source ]
USING data_source
[ OPTIONS ( key1=val1, key2=val2, ... ) ]
[ PARTITIONED BY ( col_name1, col_name2, ... ) ]
[ CLUSTERED BY ( col_name3, col_name4, ... )

View file

@ -29,12 +29,6 @@ grammar SqlBase;
*/
public boolean legacy_exponent_literal_as_decimal_enabled = false;
/**
* When false, CREATE TABLE syntax without a provider will use
* the value of spark.sql.sources.default as its provider.
*/
public boolean legacy_create_hive_table_by_default_enabled = false;
/**
* Verify whether current token is a valid decimal token (which contains dot).
* Returns true if the character that follows the token is not a digit or letter or underscore.
@ -123,12 +117,7 @@ statement
(RESTRICT | CASCADE)? #dropNamespace
| SHOW (DATABASES | NAMESPACES) ((FROM | IN) multipartIdentifier)?
(LIKE? pattern=STRING)? #showNamespaces
| {!legacy_create_hive_table_by_default_enabled}?
createTableHeader ('(' colTypeList ')')? tableProvider?
createTableClauses
(AS? query)? #createTable
| {legacy_create_hive_table_by_default_enabled}?
createTableHeader ('(' colTypeList ')')? tableProvider
| createTableHeader ('(' colTypeList ')')? tableProvider
createTableClauses
(AS? query)? #createTable
| createTableHeader ('(' columns=colTypeList ')')?

View file

@ -98,7 +98,6 @@ abstract class AbstractSqlParser(conf: SQLConf) extends ParserInterface with Log
lexer.addErrorListener(ParseErrorListener)
lexer.legacy_setops_precedence_enbled = conf.setOpsPrecedenceEnforced
lexer.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled
lexer.legacy_create_hive_table_by_default_enabled = conf.createHiveTableByDefaultEnabled
lexer.SQL_standard_keyword_behavior = conf.ansiEnabled
val tokenStream = new CommonTokenStream(lexer)
@ -108,7 +107,6 @@ abstract class AbstractSqlParser(conf: SQLConf) extends ParserInterface with Log
parser.addErrorListener(ParseErrorListener)
parser.legacy_setops_precedence_enbled = conf.setOpsPrecedenceEnforced
parser.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled
parser.legacy_create_hive_table_by_default_enabled = conf.createHiveTableByDefaultEnabled
parser.SQL_standard_keyword_behavior = conf.ansiEnabled
try {

View file

@ -2228,15 +2228,6 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
val LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED =
buildConf("spark.sql.legacy.createHiveTableByDefault.enabled")
.internal()
.doc("When set to true, CREATE TABLE syntax without a provider will use hive " +
s"instead of the value of ${DEFAULT_DATA_SOURCE_NAME.key}.")
.version("3.0.0")
.booleanConf
.createWithDefault(false)
val LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING =
buildConf("spark.sql.legacy.bucketedTableScan.outputOrdering")
.internal()
@ -3153,9 +3144,6 @@ class SQLConf extends Serializable with Logging {
def allowNegativeScaleOfDecimalEnabled: Boolean =
getConf(SQLConf.LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED)
def createHiveTableByDefaultEnabled: Boolean =
getConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED)
def truncateTableIgnorePermissionAcl: Boolean =
getConf(SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL)

View file

@ -2196,21 +2196,20 @@ class DDLParserSuite extends AnalysisTest {
CommentOnTable(UnresolvedTable(Seq("a", "b", "c")), "xYz"))
}
test("create table - without using") {
withSQLConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED.key -> "false") {
val sql = "CREATE TABLE 1m.2g(a INT)"
val expectedTableSpec = TableSpec(
Seq("1m", "2g"),
Some(new StructType().add("a", IntegerType)),
Seq.empty[Transform],
None,
Map.empty[String, String],
None,
Map.empty[String, String],
None,
None)
// TODO: ignored by SPARK-31707, restore the test after create table syntax unification
ignore("create table - without using") {
val sql = "CREATE TABLE 1m.2g(a INT)"
val expectedTableSpec = TableSpec(
Seq("1m", "2g"),
Some(new StructType().add("a", IntegerType)),
Seq.empty[Transform],
None,
Map.empty[String, String],
None,
Map.empty[String, String],
None,
None)
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}

View file

@ -256,8 +256,8 @@ class DataSourceV2SQLSuite
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty)
}
test("CreateTable: without USING clause") {
spark.conf.set(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED.key, "false")
// TODO: ignored by SPARK-31707, restore the test after create table syntax unification
ignore("CreateTable: without USING clause") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
val testCatalog = catalog("testcat").asTableCatalog
@ -681,8 +681,8 @@ class DataSourceV2SQLSuite
}
}
test("CreateTableAsSelect: without USING clause") {
spark.conf.set(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED.key, "false")
// TODO: ignored by SPARK-31707, restore the test after create table syntax unification
ignore("CreateTableAsSelect: without USING clause") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
val testCatalog = catalog("testcat").asTableCatalog

View file

@ -944,7 +944,7 @@ class AdaptiveQueryExecSuite
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
withTable("t1") {
val plan = sql("CREATE TABLE t1 AS SELECT 1 col").queryExecution.executedPlan
val plan = sql("CREATE TABLE t1 USING parquet AS SELECT 1 col").queryExecution.executedPlan
assert(plan.isInstanceOf[DataWritingCommandExec])
assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec])
}
@ -1005,7 +1005,7 @@ class AdaptiveQueryExecSuite
}
spark.sparkContext.addSparkListener(listener)
try {
sql("CREATE TABLE t1 AS SELECT 1 col").collect()
sql("CREATE TABLE t1 USING parquet AS SELECT 1 col").collect()
spark.sparkContext.listenerBus.waitUntilEmpty()
assert(checkDone)
} finally {

View file

@ -40,8 +40,7 @@ import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
class DDLParserSuite extends AnalysisTest with SharedSparkSession {
private lazy val parser = new SparkSqlParser(new SQLConf().copy(
SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED -> false))
private lazy val parser = new SparkSqlParser(new SQLConf)
private def assertUnsupported(sql: String, containsThesePhrases: Seq[String] = Seq()): Unit = {
val e = intercept[ParseException] {
@ -76,12 +75,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
}.head
}
private def withCreateTableStatement(sql: String)(prediction: CreateTableStatement => Unit)
: Unit = {
val statement = parser.parsePlan(sql).asInstanceOf[CreateTableStatement]
prediction(statement)
}
test("alter database - property values must be set") {
assertUnsupported(
sql = "ALTER DATABASE my_db SET DBPROPERTIES('key_without_value', 'key_with_value'='x')",
@ -487,17 +480,21 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
test("Test CTAS #3") {
val s3 = """CREATE TABLE page_view AS SELECT * FROM src"""
val statement = parser.parsePlan(s3).asInstanceOf[CreateTableAsSelectStatement]
assert(statement.tableName(0) == "page_view")
assert(statement.asSelect == parser.parsePlan("SELECT * FROM src"))
assert(statement.partitioning.isEmpty)
assert(statement.bucketSpec.isEmpty)
assert(statement.properties.isEmpty)
assert(statement.provider.isEmpty)
assert(statement.options.isEmpty)
assert(statement.location.isEmpty)
assert(statement.comment.isEmpty)
assert(!statement.ifNotExists)
val (desc, exists) = extractTableDesc(s3)
assert(exists == false)
assert(desc.identifier.database == None)
assert(desc.identifier.table == "page_view")
assert(desc.tableType == CatalogTableType.MANAGED)
assert(desc.storage.locationUri == None)
assert(desc.schema.isEmpty)
assert(desc.viewText == None) // TODO will be SQLText
assert(desc.viewQueryColumnNames.isEmpty)
assert(desc.storage.properties == Map())
assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat"))
assert(desc.storage.outputFormat ==
Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
assert(desc.properties == Map())
}
test("Test CTAS #4") {
@ -657,60 +654,67 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
test("create table - basic") {
val query = "CREATE TABLE my_table (id int, name string)"
withCreateTableStatement(query) { state =>
assert(state.tableName(0) == "my_table")
assert(state.tableSchema == new StructType().add("id", "int").add("name", "string"))
assert(state.partitioning.isEmpty)
assert(state.bucketSpec.isEmpty)
assert(state.properties.isEmpty)
assert(state.provider.isEmpty)
assert(state.options.isEmpty)
assert(state.location.isEmpty)
assert(state.comment.isEmpty)
assert(!state.ifNotExists)
}
val (desc, allowExisting) = extractTableDesc(query)
assert(!allowExisting)
assert(desc.identifier.database.isEmpty)
assert(desc.identifier.table == "my_table")
assert(desc.tableType == CatalogTableType.MANAGED)
assert(desc.schema == new StructType().add("id", "int").add("name", "string"))
assert(desc.partitionColumnNames.isEmpty)
assert(desc.bucketSpec.isEmpty)
assert(desc.viewText.isEmpty)
assert(desc.viewQueryColumnNames.isEmpty)
assert(desc.storage.locationUri.isEmpty)
assert(desc.storage.inputFormat ==
Some("org.apache.hadoop.mapred.TextInputFormat"))
assert(desc.storage.outputFormat ==
Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
assert(desc.storage.properties.isEmpty)
assert(desc.properties.isEmpty)
assert(desc.comment.isEmpty)
}
test("create table - with database name") {
val query = "CREATE TABLE dbx.my_table (id int, name string)"
withCreateTableStatement(query) { state =>
assert(state.tableName(0) == "dbx")
assert(state.tableName(1) == "my_table")
}
val (desc, _) = extractTableDesc(query)
assert(desc.identifier.database == Some("dbx"))
assert(desc.identifier.table == "my_table")
}
test("create table - temporary") {
val query = "CREATE TEMPORARY TABLE tab1 (id int, name string)"
val e = intercept[ParseException] { parser.parsePlan(query) }
assert(e.message.contains("CREATE TEMPORARY TABLE without a provider is not allowed."))
assert(e.message.contains("CREATE TEMPORARY TABLE is not supported yet"))
}
test("create table - external") {
val query = "CREATE EXTERNAL TABLE tab1 (id int, name string) LOCATION '/path/to/nowhere'"
val e = intercept[ParseException] { parser.parsePlan(query) }
assert(e.message.contains("Operation not allowed: CREATE EXTERNAL TABLE ..."))
val (desc, _) = extractTableDesc(query)
assert(desc.tableType == CatalogTableType.EXTERNAL)
assert(desc.storage.locationUri == Some(new URI("/path/to/nowhere")))
}
test("create table - if not exists") {
val query = "CREATE TABLE IF NOT EXISTS tab1 (id int, name string)"
withCreateTableStatement(query) { state =>
assert(state.ifNotExists)
}
val (_, allowExisting) = extractTableDesc(query)
assert(allowExisting)
}
test("create table - comment") {
val query = "CREATE TABLE my_table (id int, name string) COMMENT 'its hot as hell below'"
withCreateTableStatement(query) { state =>
assert(state.comment == Some("its hot as hell below"))
}
val (desc, _) = extractTableDesc(query)
assert(desc.comment == Some("its hot as hell below"))
}
test("create table - partitioned columns") {
val query = "CREATE TABLE my_table (id int, name string) PARTITIONED BY (id)"
withCreateTableStatement(query) { state =>
val transform = IdentityTransform(FieldReference(Seq("id")))
assert(state.partitioning == Seq(transform))
}
val query = "CREATE TABLE my_table (id int, name string) PARTITIONED BY (month int)"
val (desc, _) = extractTableDesc(query)
assert(desc.schema == new StructType()
.add("id", "int")
.add("name", "string")
.add("month", "int"))
assert(desc.partitionColumnNames == Seq("month"))
}
test("create table - clustered by") {
@ -726,22 +730,20 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
"""
val query1 = s"$baseQuery INTO $numBuckets BUCKETS"
withCreateTableStatement(query1) { state =>
assert(state.bucketSpec.isDefined)
val bucketSpec = state.bucketSpec.get
assert(bucketSpec.numBuckets == numBuckets)
assert(bucketSpec.bucketColumnNames.head.equals(bucketedColumn))
assert(bucketSpec.sortColumnNames.isEmpty)
}
val (desc1, _) = extractTableDesc(query1)
assert(desc1.bucketSpec.isDefined)
val bucketSpec1 = desc1.bucketSpec.get
assert(bucketSpec1.numBuckets == numBuckets)
assert(bucketSpec1.bucketColumnNames.head.equals(bucketedColumn))
assert(bucketSpec1.sortColumnNames.isEmpty)
val query2 = s"$baseQuery SORTED BY($sortColumn) INTO $numBuckets BUCKETS"
withCreateTableStatement(query2) { state =>
assert(state.bucketSpec.isDefined)
val bucketSpec = state.bucketSpec.get
assert(bucketSpec.numBuckets == numBuckets)
assert(bucketSpec.bucketColumnNames.head.equals(bucketedColumn))
assert(bucketSpec.sortColumnNames.head.equals(sortColumn))
}
val (desc2, _) = extractTableDesc(query2)
assert(desc2.bucketSpec.isDefined)
val bucketSpec2 = desc2.bucketSpec.get
assert(bucketSpec2.numBuckets == numBuckets)
assert(bucketSpec2.bucketColumnNames.head.equals(bucketedColumn))
assert(bucketSpec2.sortColumnNames.head.equals(sortColumn))
}
test("create table(hive) - skewed by") {
@ -811,9 +813,8 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
test("create table - properties") {
val query = "CREATE TABLE my_table (id int, name string) TBLPROPERTIES ('k1'='v1', 'k2'='v2')"
withCreateTableStatement(query) { state =>
assert(state.properties == Map("k1" -> "v1", "k2" -> "v2"))
}
val (desc, _) = extractTableDesc(query)
assert(desc.properties == Map("k1" -> "v1", "k2" -> "v2"))
}
test("create table(hive) - everything!") {

View file

@ -39,7 +39,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning
private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled
private val originalSessionLocalTimeZone = TestHive.conf.sessionLocalTimeZone
private val originalCreateHiveTable = TestHive.conf.createHiveTableByDefaultEnabled
def testCases: Seq[(String, File)] = {
hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f)
@ -59,7 +58,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
// Fix session local timezone to America/Los_Angeles for those timezone sensitive tests
// (timestamp_*)
TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles")
TestHive.setConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED, true)
RuleExecutor.resetMetrics()
}
@ -70,8 +68,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, originalCrossJoinEnabled)
TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, originalSessionLocalTimeZone)
TestHive.setConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED,
originalCreateHiveTable)
// For debugging dump some statistics about how much time was spent in various optimizer rules
logWarning(RuleExecutor.dumpTimeSpent())

View file

@ -25,22 +25,6 @@ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
class HiveShowCreateTableSuite extends ShowCreateTableSuite with TestHiveSingleton {
private var origCreateHiveTableConfig = false
protected override def beforeAll(): Unit = {
super.beforeAll()
origCreateHiveTableConfig =
spark.conf.get(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED)
spark.conf.set(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED.key, true)
}
protected override def afterAll(): Unit = {
spark.conf.set(
SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED.key,
origCreateHiveTableConfig)
super.afterAll()
}
test("view") {
Seq(true, false).foreach { serde =>
withView("v1") {

View file

@ -2706,33 +2706,6 @@ class HiveDDLSuite
}
}
test("SPARK-30098: create table without provider should " +
"use default data source under non-legacy mode") {
val catalog = spark.sessionState.catalog
withSQLConf(
SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED.key -> "false") {
withTable("s") {
val defaultProvider = conf.defaultDataSourceName
sql("CREATE TABLE s(a INT, b INT)")
val table = catalog.getTableMetadata(TableIdentifier("s"))
assert(table.provider === Some(defaultProvider))
}
}
}
test("SPARK-30098: create table without provider should " +
"use hive under legacy mode") {
val catalog = spark.sessionState.catalog
withSQLConf(
SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED.key -> "true") {
withTable("s") {
sql("CREATE TABLE s(a INT, b INT)")
val table = catalog.getTableMetadata(TableIdentifier("s"))
assert(table.provider === Some("hive"))
}
}
}
test("SPARK-30785: create table like a partitioned table") {
val catalog = spark.sessionState.catalog
withTable("sc_part", "ta_part") {

View file

@ -87,8 +87,7 @@ class HiveSerDeSuite extends HiveComparisonTest with PlanTest with BeforeAndAfte
SQLConf.withExistingConf(TestHive.conf)(super.withSQLConf(pairs: _*)(f))
test("Test the default fileformat for Hive-serde tables") {
withSQLConf("hive.default.fileformat" -> "orc",
SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED.key -> "true") {
withSQLConf("hive.default.fileformat" -> "orc") {
val (desc, exists) = extractTableDesc(
"CREATE TABLE IF NOT EXISTS fileformat_test (id int)")
assert(exists)
@ -97,8 +96,7 @@ class HiveSerDeSuite extends HiveComparisonTest with PlanTest with BeforeAndAfte
assert(desc.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
}
withSQLConf("hive.default.fileformat" -> "parquet",
SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED.key -> "true") {
withSQLConf("hive.default.fileformat" -> "parquet") {
val (desc, exists) = extractTableDesc("CREATE TABLE IF NOT EXISTS fileformat_test (id int)")
assert(exists)
val input = desc.storage.inputFormat