[SPARK-34793][SQL] Prohibit saving of day-time and year-month intervals
### What changes were proposed in this pull request? For all built-in datasources, prohibit saving of year-month and day-time intervals that were introduced by SPARK-27793. We plan to support saving of such types at the milestone 2, see SPARK-27790. ### Why are the changes needed? To improve user experience with Spark SQL, and print nicer error message. Current error message might confuse users: ``` scala> Seq(java.time.Period.ofMonths(1)).toDF.write.mode("overwrite").json("/Users/maximgekk/tmp/123") 21/03/18 22:44:35 ERROR FileFormatWriter: Aborting job 8de402d7-ab69-4dc0-aa8e-14ef06bd2d6b. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (192.168.1.66 executor driver): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:418) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:298) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:211) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1437) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Failed to convert value 1 (class of class java.lang.Integer}) with the type of YearMonthIntervalType to JSON. at scala.sys.package$.error(package.scala:30) at org.apache.spark.sql.catalyst.json.JacksonGenerator.$anonfun$makeWriter$23(JacksonGenerator.scala:179) at org.apache.spark.sql.catalyst.json.JacksonGenerator.$anonfun$makeWriter$23$adapted(JacksonGenerator.scala:176) ``` ### Does this PR introduce _any_ user-facing change? Yes. After the changes, the example above: ``` scala> Seq(java.time.Period.ofMonths(1)).toDF.write.mode("overwrite").json("/Users/maximgekk/tmp/123") org.apache.spark.sql.AnalysisException: Cannot save interval data type into external storage. ``` ### How was this patch tested? 1. Checked nested intervals: ``` scala> spark.range(1).selectExpr("""struct(timestamp'2021-01-02 00:01:02' - timestamp'2021-01-01 00:00:00')""").write.mode("overwrite").parquet("/Users/maximgekk/tmp/123") org.apache.spark.sql.AnalysisException: Cannot save interval data type into external storage. scala> Seq(Seq(java.time.Period.ofMonths(1))).toDF.write.mode("overwrite").json("/Users/maximgekk/tmp/123") org.apache.spark.sql.AnalysisException: Cannot save interval data type into external storage. ``` 2. By running existing test suites: ``` $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *DataSourceV2DataFrameSuite" $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *DataSourceV2SQLSuite" ``` Closes #31884 from MaxGekk/ban-save-intervals. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
parent
6f89cdfb0c
commit
089c3b77e1
|
@ -101,16 +101,16 @@ object TypeUtils {
|
|||
}
|
||||
|
||||
def failWithIntervalType(dataType: DataType): Unit = {
|
||||
dataType match {
|
||||
case CalendarIntervalType =>
|
||||
throw new AnalysisException("Cannot use interval type in the table schema.")
|
||||
case ArrayType(et, _) => failWithIntervalType(et)
|
||||
case MapType(kt, vt, _) =>
|
||||
failWithIntervalType(kt)
|
||||
failWithIntervalType(vt)
|
||||
case s: StructType => s.foreach(f => failWithIntervalType(f.dataType))
|
||||
case u: UserDefinedType[_] => failWithIntervalType(u.sqlType)
|
||||
case _ =>
|
||||
invokeOnceForInterval(dataType) {
|
||||
throw new AnalysisException("Cannot use interval type in the table schema.")
|
||||
}
|
||||
}
|
||||
|
||||
def invokeOnceForInterval(dataType: DataType)(f: => Unit): Unit = {
|
||||
def isInterval(dataType: DataType): Boolean = dataType match {
|
||||
case CalendarIntervalType | DayTimeIntervalType | YearMonthIntervalType => true
|
||||
case _ => false
|
||||
}
|
||||
if (dataType.existsRecursively(isInterval)) f
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.spark.sql._
|
|||
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
|
||||
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
|
||||
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils}
|
||||
import org.apache.spark.sql.connector.catalog.TableProvider
|
||||
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
|
||||
import org.apache.spark.sql.execution.SparkPlan
|
||||
|
@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, Tex
|
|||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.sources._
|
||||
import org.apache.spark.sql.streaming.OutputMode
|
||||
import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType}
|
||||
import org.apache.spark.sql.types.{DataType, StructField, StructType}
|
||||
import org.apache.spark.sql.util.SchemaUtils
|
||||
import org.apache.spark.util.{HadoopFSUtils, ThreadUtils, Utils}
|
||||
|
||||
|
@ -510,10 +510,7 @@ case class DataSource(
|
|||
physicalPlan: SparkPlan,
|
||||
metrics: Map[String, SQLMetric]): BaseRelation = {
|
||||
val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, outputColumnNames)
|
||||
if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
|
||||
throw QueryCompilationErrors.cannotSaveIntervalIntoExternalStorageError()
|
||||
}
|
||||
|
||||
disallowWritingIntervals(outputColumns.map(_.dataType))
|
||||
providingInstance() match {
|
||||
case dataSource: CreatableRelationProvider =>
|
||||
dataSource.createRelation(
|
||||
|
@ -547,10 +544,7 @@ case class DataSource(
|
|||
* Returns a logical plan to write the given [[LogicalPlan]] out to this [[DataSource]].
|
||||
*/
|
||||
def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = {
|
||||
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
|
||||
throw QueryCompilationErrors.cannotSaveIntervalIntoExternalStorageError()
|
||||
}
|
||||
|
||||
disallowWritingIntervals(data.schema.map(_.dataType))
|
||||
providingInstance() match {
|
||||
case dataSource: CreatableRelationProvider =>
|
||||
SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode)
|
||||
|
@ -579,6 +573,12 @@ case class DataSource(
|
|||
DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, newHadoopConfiguration(),
|
||||
checkEmptyGlobPath, checkFilesExist, enableGlobbing = globPaths)
|
||||
}
|
||||
|
||||
private def disallowWritingIntervals(dataTypes: Seq[DataType]): Unit = {
|
||||
dataTypes.foreach(TypeUtils.invokeOnceForInterval(_) {
|
||||
throw QueryCompilationErrors.cannotSaveIntervalIntoExternalStorageError()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
object DataSource extends Logging {
|
||||
|
|
Loading…
Reference in a new issue