[SPARK-36349][SQL] Disallow ANSI intervals in file-based datasources

### What changes were proposed in this pull request?
In the PR, I propose to ban `YearMonthIntervalType` and `DayTimeIntervalType` at the analysis phase while creating a table using a built-in filed-based datasource or writing a dataset to such datasource. In particular, add the following case:
```scala
case _: DayTimeIntervalType | _: YearMonthIntervalType => false
```
to all methods that override either:
- V2 `FileTable.supportsDataType()`
- V1 `FileFormat.supportDataType()`

### Why are the changes needed?
To improve user experience with Spark SQL, and output a proper error message at the analysis phase.

### Does this PR introduce _any_ user-facing change?
Yes but ANSI interval types haven't released yet. So, for users this is new behavior.

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt -Phive-2.3 "test:testOnly *HiveOrcSourceSuite"
```

Closes #33580 from MaxGekk/interval-ban-in-ds.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
Max Gekk 2021-08-03 20:30:20 +03:00
parent 92cdb17d1a
commit 67cbc93263
11 changed files with 45 additions and 2 deletions

View file

@ -71,6 +71,8 @@ private[sql] object AvroUtils extends Logging {
}
def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: DayTimeIntervalType | _: YearMonthIntervalType => false
case _: AtomicType => true
case st: StructType => st.forall { f => supportsDataType(f.dataType) }

View file

@ -148,6 +148,8 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat]
override def supportDataType(dataType: DataType): Boolean = dataType match {
case _: DayTimeIntervalType | _: YearMonthIntervalType => false
case _: AtomicType => true
case udt: UserDefinedType[_] => supportDataType(udt.sqlType)

View file

@ -134,6 +134,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat]
override def supportDataType(dataType: DataType): Boolean = dataType match {
case _: DayTimeIntervalType | _: YearMonthIntervalType => false
case _: AtomicType => true
case st: StructType => st.forall { f => supportDataType(f.dataType) }

View file

@ -247,6 +247,8 @@ class OrcFileFormat
}
override def supportDataType(dataType: DataType): Boolean = dataType match {
case _: DayTimeIntervalType | _: YearMonthIntervalType => false
case _: AtomicType => true
case st: StructType => st.forall { f => supportDataType(f.dataType) }

View file

@ -373,6 +373,8 @@ class ParquetFileFormat
}
override def supportDataType(dataType: DataType): Boolean = dataType match {
case _: DayTimeIntervalType | _: YearMonthIntervalType => false
case _: AtomicType => true
case st: StructType => st.forall { f => supportDataType(f.dataType) }

View file

@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuild
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{AtomicType, DataType, StructType, UserDefinedType}
import org.apache.spark.sql.types.{AtomicType, DataType, DayTimeIntervalType, StructType, UserDefinedType, YearMonthIntervalType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
case class CSVTable(
@ -55,6 +55,8 @@ case class CSVTable(
}
override def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: DayTimeIntervalType | _: YearMonthIntervalType => false
case _: AtomicType => true
case udt: UserDefinedType[_] => supportsDataType(udt.sqlType)

View file

@ -55,6 +55,8 @@ case class JsonTable(
}
override def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: DayTimeIntervalType | _: YearMonthIntervalType => false
case _: AtomicType => true
case st: StructType => st.forall { f => supportsDataType(f.dataType) }

View file

@ -49,6 +49,8 @@ case class OrcTable(
}
override def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: DayTimeIntervalType | _: YearMonthIntervalType => false
case _: AtomicType => true
case st: StructType => st.forall { f => supportsDataType(f.dataType) }

View file

@ -49,6 +49,8 @@ case class ParquetTable(
}
override def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: DayTimeIntervalType | _: YearMonthIntervalType => false
case _: AtomicType => true
case st: StructType => st.forall { f => supportsDataType(f.dataType) }

View file

@ -17,9 +17,11 @@
package org.apache.spark.sql.execution.datasources
import java.util.Locale
import org.scalatest.funsuite.AnyFunSuite
import org.apache.spark.sql.{Dataset, Encoders, FakeFileSystemRequiringDSOption, SparkSession}
import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, FakeFileSystemRequiringDSOption, SparkSession}
import org.apache.spark.sql.catalyst.plans.SQLHelper
/**
@ -33,6 +35,27 @@ trait CommonFileDataSourceSuite extends SQLHelper { self: AnyFunSuite =>
protected def dataSourceFormat: String
protected def inputDataset: Dataset[_] = spark.createDataset(Seq("abc"))(Encoders.STRING)
test(s"SPARK-36349: disallow saving of ANSI intervals to $dataSourceFormat") {
Seq("INTERVAL '1' DAY", "INTERVAL '1' YEAR").foreach { i =>
withTempPath { dir =>
val errMsg = intercept[AnalysisException] {
spark.sql(s"SELECT $i").write.format(dataSourceFormat).save(dir.getAbsolutePath)
}.getMessage
assert(errMsg.contains("Cannot save interval data type into external storage"))
}
}
// Check all built-in file-based datasources except of libsvm which requires particular schema.
if (!Set("libsvm").contains(dataSourceFormat.toLowerCase(Locale.ROOT))) {
Seq("INTERVAL DAY TO SECOND", "INTERVAL YEAR TO MONTH").foreach { it =>
val errMsg = intercept[AnalysisException] {
spark.sql(s"CREATE TABLE t (i $it) USING $dataSourceFormat")
}.getMessage
assert(errMsg.contains("data source does not support"))
}
}
}
test(s"Propagate Hadoop configs from $dataSourceFormat options to underlying file system") {
withSQLConf(
"fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,

View file

@ -194,6 +194,8 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
}
override def supportDataType(dataType: DataType): Boolean = dataType match {
case _: DayTimeIntervalType | _: YearMonthIntervalType => false
case _: AtomicType => true
case st: StructType => st.forall { f => supportDataType(f.dataType) }