[SPARK-31855][SQL][TESTS] Check reading date/timestamp from Avro files w/ and w/o Spark version

### What changes were proposed in this pull request?
1. Add the following parquet files to the resource folder `external/avro/src/test/resources`:
   - Files saved by Spark 2.4.5 (cee4ecbb16) without meta info `org.apache.spark.version`
      - `before_1582_date_v2_4_5.avro` with a date column: `avro.schema	{"type":"record","name":"topLevelRecord","fields":[{"name":"dt","type":[{"type":"int","logicalType":"date"},"null"]}]}`
      - `before_1582_timestamp_millis_v2_4_5.avro` with a timestamp column: `avro.schema	{"type":"record","name":"test","namespace":"logical","fields":[{"name":"dt","type":["null",{"type":"long","logicalType":"timestamp-millis"}],"default":null}]}`
      - `before_1582_timestamp_micros_v2_4_5.avro` with a timestamp column: `avro.schema	{"type":"record","name":"topLevelRecord","fields":[{"name":"dt","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]}]}`
    - Files saved by Spark 2.4.6-rc3 (570848da7c) with the meta info `org.apache.spark.version	2.4.6`:
      - `before_1582_date_v2_4_6.avro` is similar to `before_1582_date_v2_4_5.avro` except Spark version in parquet meta info.
      - `before_1582_timestamp_micros_v2_4_6.avro` is similar to `before_1582_timestamp_micros_v2_4_5.avro` except meta info.
      - `before_1582_timestamp_millis_v2_4_6.avro` is similar to `before_1582_timestamp_millis_v2_4_5.avro` except meta info.
2. Removed a few avro files becaused they are replaced by Avro files generated by Spark 2.4.5 above.
3. Add new test "generate test files for checking compatibility with Spark 2.4" to `AvroSuite` (marked as ignored). The parquet files above were generated by this test.
4. Modified "SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps" in `AvroSuite` to use new parquet files.

### Why are the changes needed?
To improve test coverage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By `AvroV1Suite` and `AvroV2Suite`.

Closes #28664 from MaxGekk/avro-update-resource-files.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Max Gekk 2020-05-29 05:18:37 +00:00 committed by Wenchen Fan
parent 45864faaf2
commit 37a1fb8d08
9 changed files with 98 additions and 19 deletions

Binary file not shown.

Binary file not shown.

View file

@ -19,7 +19,7 @@ package org.apache.spark.sql.avro
import java.io._
import java.net.URL
import java.nio.file.{Files, Paths}
import java.nio.file.{Files, Paths, StandardCopyOption}
import java.sql.{Date, Timestamp}
import java.util.{Locale, UUID}
@ -38,7 +38,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.TestingUDT.IntervalData
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, UTC}
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, LA, UTC}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@ -1529,23 +1529,82 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
}
}
// It generates input files for the test below:
// "SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps"
ignore("SPARK-31855: generate test files for checking compatibility with Spark 2.4") {
val resourceDir = "external/avro/src/test/resources"
val version = "2_4_6"
def save(
in: Seq[String],
t: String,
dstFile: String,
options: Map[String, String] = Map.empty): Unit = {
withTempDir { dir =>
in.toDF("dt")
.select($"dt".cast(t))
.repartition(1)
.write
.mode("overwrite")
.options(options)
.format("avro")
.save(dir.getCanonicalPath)
Files.copy(
dir.listFiles().filter(_.getName.endsWith(".avro")).head.toPath,
Paths.get(resourceDir, dstFile),
StandardCopyOption.REPLACE_EXISTING)
}
}
withDefaultTimeZone(LA) {
withSQLConf(
SQLConf.SESSION_LOCAL_TIMEZONE.key -> LA.getId) {
save(
Seq("1001-01-01"),
"date",
s"before_1582_date_v$version.avro")
save(
Seq("1001-01-01 01:02:03.123"),
"timestamp",
s"before_1582_timestamp_millis_v$version.avro",
// scalastyle:off line.size.limit
Map("avroSchema" ->
s"""
| {
| "namespace": "logical",
| "type": "record",
| "name": "test",
| "fields": [
| {"name": "dt", "type": ["null", {"type": "long","logicalType": "timestamp-millis"}], "default": null}
| ]
| }
|""".stripMargin))
// scalastyle:on line.size.limit
save(
Seq("1001-01-01 01:02:03.123456"),
"timestamp",
s"before_1582_timestamp_micros_v$version.avro")
}
}
}
test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
// test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together.
def checkReadMixedFiles(fileName: String, dt: String, dataStr: String): Unit = {
def checkReadMixedFiles(
fileName: String,
dt: String,
dataStr: String,
checkDefaultLegacyRead: String => Unit): Unit = {
withTempPaths(2) { paths =>
paths.foreach(_.delete())
val path2_4 = getResourceAvroFilePath(fileName)
val path3_0 = paths(0).getCanonicalPath
val path3_0_rebase = paths(1).getCanonicalPath
if (dt == "date") {
val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date"))
val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("dt"))
// By default we should fail to write ancient datetime values.
var e = intercept[SparkException](df.write.format("avro").save(path3_0))
val e = intercept[SparkException](df.write.format("avro").save(path3_0))
assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
// By default we should fail to read ancient datetime values.
e = intercept[SparkException](spark.read.format("avro").load(path2_4).collect())
assert(e.getCause.isInstanceOf[SparkUpgradeException])
checkDefaultLegacyRead(path2_4)
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) {
df.write.format("avro").mode("overwrite").save(path3_0)
@ -1562,25 +1621,23 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
}
} else {
val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts"))
val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("dt"))
val avroSchema =
s"""
|{
| "type" : "record",
| "name" : "test_schema",
| "fields" : [
| {"name": "ts", "type": {"type": "long", "logicalType": "$dt"}}
| {"name": "dt", "type": {"type": "long", "logicalType": "$dt"}}
| ]
|}""".stripMargin
// By default we should fail to write ancient datetime values.
var e = intercept[SparkException] {
val e = intercept[SparkException] {
df.write.format("avro").option("avroSchema", avroSchema).save(path3_0)
}
assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
// By default we should fail to read ancient datetime values.
e = intercept[SparkException](spark.read.format("avro").load(path2_4).collect())
assert(e.getCause.isInstanceOf[SparkUpgradeException])
checkDefaultLegacyRead(path2_4)
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) {
df.write.format("avro").option("avroSchema", avroSchema).mode("overwrite").save(path3_0)
@ -1600,11 +1657,33 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
}
}
checkReadMixedFiles("before_1582_date_v2_4.avro", "date", "1001-01-01")
checkReadMixedFiles(
"before_1582_ts_micros_v2_4.avro", "timestamp-micros", "1001-01-01 01:02:03.123456")
checkReadMixedFiles(
"before_1582_ts_millis_v2_4.avro", "timestamp-millis", "1001-01-01 01:02:03.124")
def failInRead(path: String): Unit = {
val e = intercept[SparkException](spark.read.format("avro").load(path).collect())
assert(e.getCause.isInstanceOf[SparkUpgradeException])
}
def successInRead(path: String): Unit = spark.read.format("avro").load(path).collect()
Seq(
// By default we should fail to read ancient datetime values when parquet files don't
// contain Spark version.
"2_4_5" -> failInRead _,
"2_4_6" -> successInRead _
).foreach { case (version, checkDefaultRead) =>
checkReadMixedFiles(
s"before_1582_date_v$version.avro",
"date",
"1001-01-01",
checkDefaultRead)
checkReadMixedFiles(
s"before_1582_timestamp_micros_v$version.avro",
"timestamp-micros",
"1001-01-01 01:02:03.123456",
checkDefaultRead)
checkReadMixedFiles(
s"before_1582_timestamp_millis_v$version.avro",
"timestamp-millis",
"1001-01-01 01:02:03.123",
checkDefaultRead)
}
}
test("SPARK-31183: rebasing microseconds timestamps in write") {