[SPARK-36135][SQL] Support TimestampNTZ type in file partitioning

### What changes were proposed in this pull request?

Support TimestampNTZ type in file partitioning
* When there is no provided schema and the default Timestamp type is TimestampNTZ , Spark should infer and parse the timestamp value partitions as TimestampNTZ.
* When the provided Partition schema is TimestampNTZ, Spark should be able to parse the TimestampNTZ type partition column.

### Why are the changes needed?

File partitioning is an important feature and Spark should support TimestampNTZ type in it.

### Does this PR introduce _any_ user-facing change?

Yes, Spark supports TimestampNTZ type in file partitioning

### How was this patch tested?

Unit tests

Closes #33344 from gengliangwang/partition.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 96c2919988)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
This commit is contained in:
Gengliang Wang 2021-07-16 01:13:32 +08:00
parent 1ed72e2e8e
commit fce19ab31a
4 changed files with 170 additions and 120 deletions

View file

@ -867,6 +867,9 @@ object TypeCoercion extends TypeCoercionBase {
case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) =>
Some(TimestampType)
case (_: TimestampNTZType, _: DateType) | (_: DateType, _: TimestampNTZType) =>
Some(TimestampNTZType)
case (t1, t2) => findTypeForComplex(t1, t2, findTightestCommonType)
}

View file

@ -94,7 +94,7 @@ public class ColumnVectorUtils {
col.getChild(1).putLongs(0, capacity, c.microseconds);
} else if (t instanceof DateType) {
col.putInts(0, capacity, row.getInt(fieldIdx));
} else if (t instanceof TimestampType) {
} else if (t instanceof TimestampType || t instanceof TimestampNTZType) {
col.putLongs(0, capacity, row.getLong(fieldIdx));
} else {
throw new RuntimeException(String.format("DataType %s is not supported" +

View file

@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.getPartitionVa
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.unsafe.types.UTF8String
@ -480,14 +481,20 @@ object PartitioningUtils {
val timestampTry = Try {
val unescapedRaw = unescapePathName(raw)
// the inferred data type is consistent with the default timestamp type
val timestampType = SQLConf.get.timestampType
// try and parse the date, if no exception occurs this is a candidate to be resolved as
// TimestampType
timestampFormatter.parse(unescapedRaw)
// TimestampType or TimestampNTZType
timestampType match {
case TimestampType => timestampFormatter.parse(unescapedRaw)
case TimestampNTZType => timestampFormatter.parseWithoutTimeZone(unescapedRaw)
}
// SPARK-23436: see comment for date
val timestampValue = Cast(Literal(unescapedRaw), TimestampType, Some(zoneId.getId)).eval()
val timestampValue = Cast(Literal(unescapedRaw), timestampType, Some(zoneId.getId)).eval()
// Disallow TimestampType if the cast returned null
require(timestampValue != null)
TimestampType
timestampType
}
if (typeInference) {
@ -522,11 +529,12 @@ object PartitioningUtils {
case _: DecimalType => Literal(new JBigDecimal(value)).value
case DateType =>
Cast(Literal(value), DateType, Some(zoneId.getId)).eval()
case TimestampType =>
// Timestamp types
case dt if AnyTimestampType.acceptsType(dt) =>
Try {
Cast(Literal(unescapePathName(value)), TimestampType, Some(zoneId.getId)).eval()
Cast(Literal(unescapePathName(value)), dt, Some(zoneId.getId)).eval()
}.getOrElse {
Cast(Cast(Literal(value), DateType, Some(zoneId.getId)), TimestampType).eval()
Cast(Cast(Literal(value), DateType, Some(zoneId.getId)), dt).eval()
}
case dt => throw QueryExecutionErrors.typeUnsupportedError(dt)
}

View file

@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
import java.math.BigInteger
import java.sql.Timestamp
import java.time.{ZoneId, ZoneOffset}
import java.util.{Calendar, Locale}
import java.time.{LocalDateTime, ZoneId, ZoneOffset}
import java.util.Locale
import com.google.common.io.Files
import org.apache.hadoop.fs.Path
@ -32,12 +32,13 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneUTC
import org.apache.spark.sql.catalyst.util.DateTimeUtils.localDateTimeToMicros
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.TimestampTypes
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@ -82,12 +83,13 @@ abstract class ParquetPartitionDiscoverySuite
check("1.5", DoubleType)
check("hello", StringType)
check("1990-02-24", DateType)
check("1990-02-24 12:00:30", TimestampType)
val c = Calendar.getInstance(TimeZoneUTC)
c.set(1990, 1, 24, 12, 0, 30)
c.set(Calendar.MILLISECOND, 0)
check("1990-02-24 12:00:30", TimestampType, ZoneOffset.UTC)
// The inferred timestmap type is consistent with the value of `SQLConf.TIMESTAMP_TYPE`
Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach { tsType =>
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) {
check("1990-02-24 12:00:30", SQLConf.get.timestampType)
check("1990-02-24 12:00:30", SQLConf.get.timestampType, ZoneOffset.UTC)
}
}
check(defaultPartitionName, NullType)
}
@ -366,31 +368,44 @@ abstract class ParquetPartitionDiscoverySuite
s"hdfs://host:9000/path2"),
PartitionSpec.emptySpec)
// The cases below check the resolution for type conflicts.
val t1 = Timestamp.valueOf("2014-01-01 00:00:00.0").getTime * 1000
val t2 = Timestamp.valueOf("2014-01-01 00:01:00.0").getTime * 1000
// Values in column 'a' are inferred as null, date and timestamp each, and timestamp is set
// as a common type.
// Values in column 'b' are inferred as integer, decimal(22, 0) and null, and decimal(22, 0)
// is set as a common type.
check(Seq(
s"hdfs://host:9000/path/a=$defaultPartitionName/b=0",
s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111",
s"hdfs://host:9000/path/a=2014-01-01 00%3A01%3A00.0/b=$defaultPartitionName"),
PartitionSpec(
StructType(Seq(
StructField("a", TimestampType),
StructField("b", DecimalType(22, 0)))),
Seq(
Partition(
InternalRow(null, Decimal(0)),
s"hdfs://host:9000/path/a=$defaultPartitionName/b=0"),
Partition(
InternalRow(t1, Decimal(s"${Long.MaxValue}111")),
s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111"),
Partition(
InternalRow(t2, null),
s"hdfs://host:9000/path/a=2014-01-01 00%3A01%3A00.0/b=$defaultPartitionName"))))
// The inferred timestmap type is consistent with the value of `SQLConf.TIMESTAMP_TYPE`
Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach { tsType =>
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) {
// The cases below check the resolution for type conflicts.
val t1 = if (tsType == TimestampTypes.TIMESTAMP_LTZ) {
Timestamp.valueOf("2014-01-01 00:00:00.0").getTime * 1000
} else {
localDateTimeToMicros(LocalDateTime.parse("2014-01-01T00:00:00"))
}
val t2 = if (tsType == TimestampTypes.TIMESTAMP_LTZ) {
Timestamp.valueOf("2014-01-01 00:01:00.0").getTime * 1000
} else {
localDateTimeToMicros(LocalDateTime.parse("2014-01-01T00:01:00"))
}
// Values in column 'a' are inferred as null, date and timestamp each, and timestamp is set
// as a common type.
// Values in column 'b' are inferred as integer, decimal(22, 0) and null, and decimal(22, 0)
// is set as a common type.
check(Seq(
s"hdfs://host:9000/path/a=$defaultPartitionName/b=0",
s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111",
s"hdfs://host:9000/path/a=2014-01-01 00%3A01%3A00.0/b=$defaultPartitionName"),
PartitionSpec(
StructType(Seq(
StructField("a", SQLConf.get.timestampType),
StructField("b", DecimalType(22, 0)))),
Seq(
Partition(
InternalRow(null, Decimal(0)),
s"hdfs://host:9000/path/a=$defaultPartitionName/b=0"),
Partition(
InternalRow(t1, Decimal(s"${Long.MaxValue}111")),
s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111"),
Partition(
InternalRow(t2, null),
s"hdfs://host:9000/path/a=2014-01-01 00%3A01%3A00.0/b=$defaultPartitionName"))))
}
}
}
test("parse partitions with type inference disabled") {
@ -642,97 +657,121 @@ abstract class ParquetPartitionDiscoverySuite
}
test("Various partition value types") {
val row =
Row(
100.toByte,
40000.toShort,
Int.MaxValue,
Long.MaxValue,
1.5.toFloat,
4.5,
new java.math.BigDecimal(new BigInteger("212500"), 5),
new java.math.BigDecimal("2.125"),
java.sql.Date.valueOf("2015-05-23"),
new Timestamp(0),
"This is a string, /[]?=:",
"This is not a partition column")
Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach { tsType =>
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) {
val ts = if (tsType == TimestampTypes.TIMESTAMP_LTZ) {
new Timestamp(0)
} else {
LocalDateTime.parse("1970-01-01T00:00:00")
}
val row =
Row(
100.toByte,
40000.toShort,
Int.MaxValue,
Long.MaxValue,
1.5.toFloat,
4.5,
new java.math.BigDecimal(new BigInteger("212500"), 5),
new java.math.BigDecimal("2.125"),
java.sql.Date.valueOf("2015-05-23"),
ts,
"This is a string, /[]?=:",
"This is not a partition column")
// BooleanType is not supported yet
val partitionColumnTypes =
Seq(
ByteType,
ShortType,
IntegerType,
LongType,
FloatType,
DoubleType,
DecimalType(10, 5),
DecimalType.SYSTEM_DEFAULT,
DateType,
TimestampType,
StringType)
// BooleanType is not supported yet
val partitionColumnTypes =
Seq(
ByteType,
ShortType,
IntegerType,
LongType,
FloatType,
DoubleType,
DecimalType(10, 5),
DecimalType.SYSTEM_DEFAULT,
DateType,
SQLConf.get.timestampType,
StringType)
val partitionColumns = partitionColumnTypes.zipWithIndex.map {
case (t, index) => StructField(s"p_$index", t)
}
val partitionColumns = partitionColumnTypes.zipWithIndex.map {
case (t, index) => StructField(s"p_$index", t)
}
val schema = StructType(partitionColumns :+ StructField(s"i", StringType))
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)
val schema = StructType(partitionColumns :+ StructField(s"i", StringType))
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)
withTempPath { dir =>
df.write.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
val fields = schema.map(f => Column(f.name).cast(f.dataType))
checkAnswer(spark.read.load(dir.toString).select(fields: _*), row)
}
withTempPath { dir =>
df.write
.format("parquet")
.partitionBy(partitionColumns.map(_.name): _*)
.save(dir.toString)
val fields = schema.map(f => Column(f.name).cast(f.dataType))
checkAnswer(spark.read.load(dir.toString).select(fields: _*), row)
}
withTempPath { dir =>
df.write.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
val fields = schema.map(f => Column(f.name).cast(f.dataType))
checkAnswer(spark.read.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.load(dir.toString).select(fields: _*), row)
withTempPath { dir =>
df.write.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
val fields = schema.map(f => Column(f.name).cast(f.dataType))
checkAnswer(spark.read.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.load(dir.toString).select(fields: _*), row)
}
}
}
}
test("Various inferred partition value types") {
val row =
Row(
Long.MaxValue,
4.5,
new java.math.BigDecimal(new BigInteger("1" * 20)),
java.sql.Date.valueOf("2015-05-23"),
java.sql.Timestamp.valueOf("1990-02-24 12:00:30"),
"This is a string, /[]?=:",
"This is not a partition column")
Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach { tsType =>
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) {
val ts = if (tsType == TimestampTypes.TIMESTAMP_LTZ) {
Timestamp.valueOf("1990-02-24 12:00:30")
} else {
LocalDateTime.parse("1990-02-24T12:00:30")
}
val row =
Row(
Long.MaxValue,
4.5,
new java.math.BigDecimal(new BigInteger("1" * 20)),
java.sql.Date.valueOf("2015-05-23"),
ts,
"This is a string, /[]?=:",
"This is not a partition column")
val partitionColumnTypes =
Seq(
LongType,
DoubleType,
DecimalType(20, 0),
DateType,
TimestampType,
StringType)
val partitionColumnTypes =
Seq(
LongType,
DoubleType,
DecimalType(20, 0),
DateType,
SQLConf.get.timestampType,
StringType)
val partitionColumns = partitionColumnTypes.zipWithIndex.map {
case (t, index) => StructField(s"p_$index", t)
}
val partitionColumns = partitionColumnTypes.zipWithIndex.map {
case (t, index) => StructField(s"p_$index", t)
}
val schema = StructType(partitionColumns :+ StructField(s"i", StringType))
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)
val schema = StructType(partitionColumns :+ StructField(s"i", StringType))
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)
withTempPath { dir =>
df.write.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
val fields = schema.map(f => Column(f.name))
checkAnswer(spark.read.load(dir.toString).select(fields: _*), row)
}
withTempPath { dir =>
df.write
.format("parquet")
.partitionBy(partitionColumns.map(_.name): _*)
.save(dir.toString)
val fields = schema.map(f => Column(f.name))
checkAnswer(spark.read.load(dir.toString).select(fields: _*), row)
}
withTempPath { dir =>
df.write.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
val fields = schema.map(f => Column(f.name))
checkAnswer(spark.read.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.load(dir.toString).select(fields: _*), row)
withTempPath { dir =>
df.write.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
val fields = schema.map(f => Column(f.name))
checkAnswer(spark.read.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.load(dir.toString).select(fields: _*), row)
}
}
}
}
@ -1346,4 +1385,4 @@ class ParquetV2PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite {
}
}
}
}
}