From 96c2919988ddf78d104103876d8d8221e8145baa Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 16 Jul 2021 01:13:32 +0800 Subject: [PATCH] [SPARK-36135][SQL] Support TimestampNTZ type in file partitioning ### What changes were proposed in this pull request? Support TimestampNTZ type in file partitioning * When there is no provided schema and the default Timestamp type is TimestampNTZ , Spark should infer and parse the timestamp value partitions as TimestampNTZ. * When the provided Partition schema is TimestampNTZ, Spark should be able to parse the TimestampNTZ type partition column. ### Why are the changes needed? File partitioning is an important feature and Spark should support TimestampNTZ type in it. ### Does this PR introduce _any_ user-facing change? Yes, Spark supports TimestampNTZ type in file partitioning ### How was this patch tested? Unit tests Closes #33344 from gengliangwang/partition. Authored-by: Gengliang Wang Signed-off-by: Gengliang Wang --- .../sql/catalyst/analysis/TypeCoercion.scala | 3 + .../vectorized/ColumnVectorUtils.java | 2 +- .../datasources/PartitioningUtils.scala | 22 +- .../ParquetPartitionDiscoverySuite.scala | 263 ++++++++++-------- 4 files changed, 170 insertions(+), 120 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index da0489499f..42c10e8a11 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -867,6 +867,9 @@ object TypeCoercion extends TypeCoercionBase { case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) => Some(TimestampType) + case (_: TimestampNTZType, _: DateType) | (_: DateType, _: TimestampNTZType) => + Some(TimestampNTZType) + case (t1, t2) => findTypeForComplex(t1, t2, findTightestCommonType) } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index 25aabcd086..2010d5cfa5 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -94,7 +94,7 @@ public class ColumnVectorUtils { col.getChild(1).putLongs(0, capacity, c.microseconds); } else if (t instanceof DateType) { col.putInts(0, capacity, row.getInt(fieldIdx)); - } else if (t instanceof TimestampType) { + } else if (t instanceof TimestampType || t instanceof TimestampNTZType) { col.putLongs(0, capacity, row.getLong(fieldIdx)); } else { throw new RuntimeException(String.format("DataType %s is not supported" + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 91029ec171..a85f3409c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.getPartitionVa import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.unsafe.types.UTF8String @@ -480,14 +481,20 @@ object PartitioningUtils { val timestampTry = Try { val unescapedRaw = unescapePathName(raw) + // the inferred data type is consistent with the default timestamp type + val timestampType = SQLConf.get.timestampType // try and parse the date, if no exception occurs this is a candidate to be resolved as - // TimestampType - timestampFormatter.parse(unescapedRaw) + // TimestampType or TimestampNTZType + timestampType match { + case TimestampType => timestampFormatter.parse(unescapedRaw) + case TimestampNTZType => timestampFormatter.parseWithoutTimeZone(unescapedRaw) + } + // SPARK-23436: see comment for date - val timestampValue = Cast(Literal(unescapedRaw), TimestampType, Some(zoneId.getId)).eval() + val timestampValue = Cast(Literal(unescapedRaw), timestampType, Some(zoneId.getId)).eval() // Disallow TimestampType if the cast returned null require(timestampValue != null) - TimestampType + timestampType } if (typeInference) { @@ -522,11 +529,12 @@ object PartitioningUtils { case _: DecimalType => Literal(new JBigDecimal(value)).value case DateType => Cast(Literal(value), DateType, Some(zoneId.getId)).eval() - case TimestampType => + // Timestamp types + case dt if AnyTimestampType.acceptsType(dt) => Try { - Cast(Literal(unescapePathName(value)), TimestampType, Some(zoneId.getId)).eval() + Cast(Literal(unescapePathName(value)), dt, Some(zoneId.getId)).eval() }.getOrElse { - Cast(Cast(Literal(value), DateType, Some(zoneId.getId)), TimestampType).eval() + Cast(Cast(Literal(value), DateType, Some(zoneId.getId)), dt).eval() } case dt => throw QueryExecutionErrors.typeUnsupportedError(dt) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index c152761fe7..defb96d6da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.File import java.math.BigInteger import java.sql.Timestamp -import java.time.{ZoneId, ZoneOffset} -import java.util.{Calendar, Locale} +import java.time.{LocalDateTime, ZoneId, ZoneOffset} +import java.util.Locale import com.google.common.io.Files import org.apache.hadoop.fs.Path @@ -32,12 +32,13 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} -import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneUTC +import org.apache.spark.sql.catalyst.util.DateTimeUtils.localDateTimeToMicros import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition} import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.TimestampTypes import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -82,12 +83,13 @@ abstract class ParquetPartitionDiscoverySuite check("1.5", DoubleType) check("hello", StringType) check("1990-02-24", DateType) - check("1990-02-24 12:00:30", TimestampType) - - val c = Calendar.getInstance(TimeZoneUTC) - c.set(1990, 1, 24, 12, 0, 30) - c.set(Calendar.MILLISECOND, 0) - check("1990-02-24 12:00:30", TimestampType, ZoneOffset.UTC) + // The inferred timestmap type is consistent with the value of `SQLConf.TIMESTAMP_TYPE` + Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach { tsType => + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) { + check("1990-02-24 12:00:30", SQLConf.get.timestampType) + check("1990-02-24 12:00:30", SQLConf.get.timestampType, ZoneOffset.UTC) + } + } check(defaultPartitionName, NullType) } @@ -366,31 +368,44 @@ abstract class ParquetPartitionDiscoverySuite s"hdfs://host:9000/path2"), PartitionSpec.emptySpec) - // The cases below check the resolution for type conflicts. - val t1 = Timestamp.valueOf("2014-01-01 00:00:00.0").getTime * 1000 - val t2 = Timestamp.valueOf("2014-01-01 00:01:00.0").getTime * 1000 - // Values in column 'a' are inferred as null, date and timestamp each, and timestamp is set - // as a common type. - // Values in column 'b' are inferred as integer, decimal(22, 0) and null, and decimal(22, 0) - // is set as a common type. - check(Seq( - s"hdfs://host:9000/path/a=$defaultPartitionName/b=0", - s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111", - s"hdfs://host:9000/path/a=2014-01-01 00%3A01%3A00.0/b=$defaultPartitionName"), - PartitionSpec( - StructType(Seq( - StructField("a", TimestampType), - StructField("b", DecimalType(22, 0)))), - Seq( - Partition( - InternalRow(null, Decimal(0)), - s"hdfs://host:9000/path/a=$defaultPartitionName/b=0"), - Partition( - InternalRow(t1, Decimal(s"${Long.MaxValue}111")), - s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111"), - Partition( - InternalRow(t2, null), - s"hdfs://host:9000/path/a=2014-01-01 00%3A01%3A00.0/b=$defaultPartitionName")))) + // The inferred timestmap type is consistent with the value of `SQLConf.TIMESTAMP_TYPE` + Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach { tsType => + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) { + // The cases below check the resolution for type conflicts. + val t1 = if (tsType == TimestampTypes.TIMESTAMP_LTZ) { + Timestamp.valueOf("2014-01-01 00:00:00.0").getTime * 1000 + } else { + localDateTimeToMicros(LocalDateTime.parse("2014-01-01T00:00:00")) + } + val t2 = if (tsType == TimestampTypes.TIMESTAMP_LTZ) { + Timestamp.valueOf("2014-01-01 00:01:00.0").getTime * 1000 + } else { + localDateTimeToMicros(LocalDateTime.parse("2014-01-01T00:01:00")) + } + // Values in column 'a' are inferred as null, date and timestamp each, and timestamp is set + // as a common type. + // Values in column 'b' are inferred as integer, decimal(22, 0) and null, and decimal(22, 0) + // is set as a common type. + check(Seq( + s"hdfs://host:9000/path/a=$defaultPartitionName/b=0", + s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111", + s"hdfs://host:9000/path/a=2014-01-01 00%3A01%3A00.0/b=$defaultPartitionName"), + PartitionSpec( + StructType(Seq( + StructField("a", SQLConf.get.timestampType), + StructField("b", DecimalType(22, 0)))), + Seq( + Partition( + InternalRow(null, Decimal(0)), + s"hdfs://host:9000/path/a=$defaultPartitionName/b=0"), + Partition( + InternalRow(t1, Decimal(s"${Long.MaxValue}111")), + s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111"), + Partition( + InternalRow(t2, null), + s"hdfs://host:9000/path/a=2014-01-01 00%3A01%3A00.0/b=$defaultPartitionName")))) + } + } } test("parse partitions with type inference disabled") { @@ -642,97 +657,121 @@ abstract class ParquetPartitionDiscoverySuite } test("Various partition value types") { - val row = - Row( - 100.toByte, - 40000.toShort, - Int.MaxValue, - Long.MaxValue, - 1.5.toFloat, - 4.5, - new java.math.BigDecimal(new BigInteger("212500"), 5), - new java.math.BigDecimal("2.125"), - java.sql.Date.valueOf("2015-05-23"), - new Timestamp(0), - "This is a string, /[]?=:", - "This is not a partition column") + Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach { tsType => + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) { + val ts = if (tsType == TimestampTypes.TIMESTAMP_LTZ) { + new Timestamp(0) + } else { + LocalDateTime.parse("1970-01-01T00:00:00") + } + val row = + Row( + 100.toByte, + 40000.toShort, + Int.MaxValue, + Long.MaxValue, + 1.5.toFloat, + 4.5, + new java.math.BigDecimal(new BigInteger("212500"), 5), + new java.math.BigDecimal("2.125"), + java.sql.Date.valueOf("2015-05-23"), + ts, + "This is a string, /[]?=:", + "This is not a partition column") - // BooleanType is not supported yet - val partitionColumnTypes = - Seq( - ByteType, - ShortType, - IntegerType, - LongType, - FloatType, - DoubleType, - DecimalType(10, 5), - DecimalType.SYSTEM_DEFAULT, - DateType, - TimestampType, - StringType) + // BooleanType is not supported yet + val partitionColumnTypes = + Seq( + ByteType, + ShortType, + IntegerType, + LongType, + FloatType, + DoubleType, + DecimalType(10, 5), + DecimalType.SYSTEM_DEFAULT, + DateType, + SQLConf.get.timestampType, + StringType) - val partitionColumns = partitionColumnTypes.zipWithIndex.map { - case (t, index) => StructField(s"p_$index", t) - } + val partitionColumns = partitionColumnTypes.zipWithIndex.map { + case (t, index) => StructField(s"p_$index", t) + } - val schema = StructType(partitionColumns :+ StructField(s"i", StringType)) - val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema) + val schema = StructType(partitionColumns :+ StructField(s"i", StringType)) + val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema) - withTempPath { dir => - df.write.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString) - val fields = schema.map(f => Column(f.name).cast(f.dataType)) - checkAnswer(spark.read.load(dir.toString).select(fields: _*), row) - } + withTempPath { dir => + df.write + .format("parquet") + .partitionBy(partitionColumns.map(_.name): _*) + .save(dir.toString) + val fields = schema.map(f => Column(f.name).cast(f.dataType)) + checkAnswer(spark.read.load(dir.toString).select(fields: _*), row) + } - withTempPath { dir => - df.write.option(DateTimeUtils.TIMEZONE_OPTION, "UTC") - .format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString) - val fields = schema.map(f => Column(f.name).cast(f.dataType)) - checkAnswer(spark.read.option(DateTimeUtils.TIMEZONE_OPTION, "UTC") - .load(dir.toString).select(fields: _*), row) + withTempPath { dir => + df.write.option(DateTimeUtils.TIMEZONE_OPTION, "UTC") + .format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString) + val fields = schema.map(f => Column(f.name).cast(f.dataType)) + checkAnswer(spark.read.option(DateTimeUtils.TIMEZONE_OPTION, "UTC") + .load(dir.toString).select(fields: _*), row) + } + } } } test("Various inferred partition value types") { - val row = - Row( - Long.MaxValue, - 4.5, - new java.math.BigDecimal(new BigInteger("1" * 20)), - java.sql.Date.valueOf("2015-05-23"), - java.sql.Timestamp.valueOf("1990-02-24 12:00:30"), - "This is a string, /[]?=:", - "This is not a partition column") + Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach { tsType => + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) { + val ts = if (tsType == TimestampTypes.TIMESTAMP_LTZ) { + Timestamp.valueOf("1990-02-24 12:00:30") + } else { + LocalDateTime.parse("1990-02-24T12:00:30") + } + val row = + Row( + Long.MaxValue, + 4.5, + new java.math.BigDecimal(new BigInteger("1" * 20)), + java.sql.Date.valueOf("2015-05-23"), + ts, + "This is a string, /[]?=:", + "This is not a partition column") - val partitionColumnTypes = - Seq( - LongType, - DoubleType, - DecimalType(20, 0), - DateType, - TimestampType, - StringType) + val partitionColumnTypes = + Seq( + LongType, + DoubleType, + DecimalType(20, 0), + DateType, + SQLConf.get.timestampType, + StringType) - val partitionColumns = partitionColumnTypes.zipWithIndex.map { - case (t, index) => StructField(s"p_$index", t) - } + val partitionColumns = partitionColumnTypes.zipWithIndex.map { + case (t, index) => StructField(s"p_$index", t) + } - val schema = StructType(partitionColumns :+ StructField(s"i", StringType)) - val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema) + val schema = StructType(partitionColumns :+ StructField(s"i", StringType)) + val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema) - withTempPath { dir => - df.write.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString) - val fields = schema.map(f => Column(f.name)) - checkAnswer(spark.read.load(dir.toString).select(fields: _*), row) - } + withTempPath { dir => + df.write + .format("parquet") + .partitionBy(partitionColumns.map(_.name): _*) + .save(dir.toString) + val fields = schema.map(f => Column(f.name)) + checkAnswer(spark.read.load(dir.toString).select(fields: _*), row) + } - withTempPath { dir => - df.write.option(DateTimeUtils.TIMEZONE_OPTION, "UTC") - .format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString) - val fields = schema.map(f => Column(f.name)) - checkAnswer(spark.read.option(DateTimeUtils.TIMEZONE_OPTION, "UTC") - .load(dir.toString).select(fields: _*), row) + withTempPath { dir => + df.write.option(DateTimeUtils.TIMEZONE_OPTION, "UTC") + .format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString) + val fields = schema.map(f => Column(f.name)) + checkAnswer(spark.read.option(DateTimeUtils.TIMEZONE_OPTION, "UTC") + .load(dir.toString).select(fields: _*), row) + } + } } } @@ -1346,4 +1385,4 @@ class ParquetV2PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite { } } } -} \ No newline at end of file +}