spark-instrumented-optimizer/sql/catalyst
Max Gekk c77c0d41e1 [SPARK-36825][SQL] Read/write dataframes with ANSI intervals from/to parquet files
### What changes were proposed in this pull request?
Allow saving and loading of ANSI intervals - `YearMonthIntervalType` and `DayTimeIntervalType` to/from the Parquet datasource. After the changes, Spark saves ANSI intervals as primitive physical Parquet types:
- year-month intervals as `INT32`
- day-time intervals as `INT64`

w/o any modifications. To load the values as intervals back, Spark puts the info about interval types to the extra key `org.apache.spark.sql.parquet.row.metadata`:
```
$ java -jar parquet-tools-1.12.0.jar meta ./part-...-c000.snappy.parquet

creator:     parquet-mr version 1.12.1 (build 2a5c06c58fa987f85aa22170be14d927d5ff6e7d)
extra:       org.apache.spark.version = 3.3.0
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[...,{"name":"i","type":"interval year to month","nullable":false,"metadata":{}}]}

file schema: spark_schema
--------------------------------------------------------------------------------
...
i:           REQUIRED INT32 R:0 D:0
```

**Note:** The given PR focus on support of ANSI intervals in the Parquet datasource via write or read as a column in `Dataset`.

### Why are the changes needed?
To improve user experience with Spark SQL. At the moment, users can make ANSI intervals "inside" Spark or parallelize Java collections of `Period`/`Duration` objects but cannot save the intervals to any built-in datasources. After the changes, users can save datasets/dataframes with year-month/day-time intervals to load them back later by Apache Spark.

For example:
```scala
scala> sql("select date'today' - date'2021-01-01' as diff").write.parquet("/Users/maximgekk/tmp/parquet_interval")

scala> val readback = spark.read.parquet("/Users/maximgekk/tmp/parquet_interval")
readback: org.apache.spark.sql.DataFrame = [diff: interval day]

scala> readback.printSchema
root
 |-- diff: interval day (nullable = true)

scala> readback.show
+------------------+
|              diff|
+------------------+
|INTERVAL '264' DAY|
+------------------+
```

### Does this PR introduce _any_ user-facing change?
In some sense, yes. Before the changes, users get an error while saving of ANSI intervals as dataframe columns to parquet files but the operation should complete successfully after the changes.

### How was this patch tested?
1. By running the existing test suites:
```
$ build/sbt "test:testOnly *ParquetFileFormatV2Suite"
$ build/sbt "test:testOnly *FileBasedDataSourceSuite"
$ build/sbt "sql/test:testOnly *JsonV2Suite"
```
2. Added new tests:
```
$ build/sbt "sql/test:testOnly *ParquetIOSuite"
$ build/sbt "sql/test:testOnly *ParquetSchemaSuite"
```

Closes #34057 from MaxGekk/ansi-interval-save-parquet.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-09-24 09:55:11 +03:00
..
benchmarks [SPARK-36613][SQL][SS] Use EnumSet as the implementation of Table.capabilities method return value 2021-09-05 08:23:05 -05:00
src [SPARK-36825][SQL] Read/write dataframes with ANSI intervals from/to parquet files 2021-09-24 09:55:11 +03:00
pom.xml [SPARK-36712][BUILD] Make scala-parallel-collections in 2.13 POM a direct dependency (not in maven profile) 2021-09-13 11:06:50 -05:00