diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a04f877807..831ef62d74 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -285,8 +285,24 @@ object SQLConf { .booleanConf .createWithDefault(true) + object ParquetOutputTimestampType extends Enumeration { + val INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS = Value + } + + val PARQUET_OUTPUT_TIMESTAMP_TYPE = buildConf("spark.sql.parquet.outputTimestampType") + .doc("Sets which Parquet timestamp type to use when Spark writes data to Parquet files. " + + "INT96 is a non-standard but commonly used timestamp type in Parquet. TIMESTAMP_MICROS " + + "is a standard timestamp type in Parquet, which stores number of microseconds from the " + + "Unix epoch. TIMESTAMP_MILLIS is also standard, but with millisecond precision, which " + + "means Spark has to truncate the microsecond portion of its timestamp value.") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(ParquetOutputTimestampType.values.map(_.toString)) + .createWithDefault(ParquetOutputTimestampType.INT96.toString) + val PARQUET_INT64_AS_TIMESTAMP_MILLIS = buildConf("spark.sql.parquet.int64AsTimestampMillis") - .doc("When true, timestamp values will be stored as INT64 with TIMESTAMP_MILLIS as the " + + .doc(s"(Deprecated since Spark 2.3, please set ${PARQUET_OUTPUT_TIMESTAMP_TYPE.key}.) " + + "When true, timestamp values will be stored as INT64 with TIMESTAMP_MILLIS as the " + "extended type. In this mode, the microsecond portion of the timestamp value will be" + "truncated.") .booleanConf @@ -1143,6 +1159,18 @@ class SQLConf extends Serializable with Logging { def isParquetINT64AsTimestampMillis: Boolean = getConf(PARQUET_INT64_AS_TIMESTAMP_MILLIS) + def parquetOutputTimestampType: ParquetOutputTimestampType.Value = { + val isOutputTimestampTypeSet = settings.containsKey(PARQUET_OUTPUT_TIMESTAMP_TYPE.key) + if (!isOutputTimestampTypeSet && isParquetINT64AsTimestampMillis) { + // If PARQUET_OUTPUT_TIMESTAMP_TYPE is not set and PARQUET_INT64_AS_TIMESTAMP_MILLIS is set, + // respect PARQUET_INT64_AS_TIMESTAMP_MILLIS and use TIMESTAMP_MILLIS. Otherwise, + // PARQUET_OUTPUT_TIMESTAMP_TYPE has higher priority. + ParquetOutputTimestampType.TIMESTAMP_MILLIS + } else { + ParquetOutputTimestampType.withName(getConf(PARQUET_OUTPUT_TIMESTAMP_TYPE)) + } + } + def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT) def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 5a810cae1e..80c2f491b4 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -197,8 +197,6 @@ public abstract class SpecificParquetRecordReaderBase extends RecordReader extends RecordReader columns = requestedSchema.getColumns(); + List types = requestedSchema.asGroupType().getFields(); columnReaders = new VectorizedColumnReader[columns.size()]; for (int i = 0; i < columns.size(); ++i) { if (missingColumns[i]) continue; - columnReaders[i] = new VectorizedColumnReader(columns.get(i), - pages.getPageReader(columns.get(i))); + columnReaders[i] = new VectorizedColumnReader( + columns.get(i), types.get(i).getOriginalType(), pages.getPageReader(columns.get(i))); } totalCountLoadedSoFar += pages.getRowCount(); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 61bd65dd48..a48f8d517b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -111,23 +111,15 @@ class ParquetFileFormat // This metadata is only useful for detecting optional columns when pushdowning filters. ParquetWriteSupport.setSchema(dataSchema, conf) - // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema) - // and `CatalystWriteSupport` (writing actual rows to Parquet files). - conf.set( - SQLConf.PARQUET_BINARY_AS_STRING.key, - sparkSession.sessionState.conf.isParquetBinaryAsString.toString) - - conf.set( - SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - sparkSession.sessionState.conf.isParquetINT96AsTimestamp.toString) - + // Sets flags for `ParquetWriteSupport`, which converts Catalyst schema to Parquet + // schema and writes actual rows to Parquet files. conf.set( SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, sparkSession.sessionState.conf.writeLegacyParquetFormat.toString) conf.set( - SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key, - sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis.toString) + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, + sparkSession.sessionState.conf.parquetOutputTimestampType.toString) // Sets compression scheme conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) @@ -312,16 +304,13 @@ class ParquetFileFormat ParquetWriteSupport.setSchema(requiredSchema, hadoopConf) - // Sets flags for `CatalystSchemaConverter` + // Sets flags for `ParquetToSparkSchemaConverter` hadoopConf.setBoolean( SQLConf.PARQUET_BINARY_AS_STRING.key, sparkSession.sessionState.conf.isParquetBinaryAsString) hadoopConf.setBoolean( SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sparkSession.sessionState.conf.isParquetINT96AsTimestamp) - hadoopConf.setBoolean( - SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key, - sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis) // Try to push down filters when filter push-down is enabled. val pushed = @@ -428,15 +417,9 @@ object ParquetFileFormat extends Logging { private[parquet] def readSchema( footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = { - def parseParquetSchema(schema: MessageType): StructType = { - val converter = new ParquetSchemaConverter( - sparkSession.sessionState.conf.isParquetBinaryAsString, - sparkSession.sessionState.conf.isParquetBinaryAsString, - sparkSession.sessionState.conf.writeLegacyParquetFormat, - sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis) - - converter.convert(schema) - } + val converter = new ParquetToSparkSchemaConverter( + sparkSession.sessionState.conf.isParquetBinaryAsString, + sparkSession.sessionState.conf.isParquetINT96AsTimestamp) val seen = mutable.HashSet[String]() val finalSchemas: Seq[StructType] = footers.flatMap { footer => @@ -447,7 +430,7 @@ object ParquetFileFormat extends Logging { .get(ParquetReadSupport.SPARK_METADATA_KEY) if (serializedSchema.isEmpty) { // Falls back to Parquet schema if no Spark SQL schema found. - Some(parseParquetSchema(metadata.getSchema)) + Some(converter.convert(metadata.getSchema)) } else if (!seen.contains(serializedSchema.get)) { seen += serializedSchema.get @@ -470,7 +453,7 @@ object ParquetFileFormat extends Logging { .map(_.asInstanceOf[StructType]) .getOrElse { // Falls back to Parquet schema if Spark SQL schema can't be parsed. - parseParquetSchema(metadata.getSchema) + converter.convert(metadata.getSchema) }) } else { None @@ -538,8 +521,6 @@ object ParquetFileFormat extends Logging { sparkSession: SparkSession): Option[StructType] = { val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp - val writeTimestampInMillis = sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis - val writeLegacyParquetFormat = sparkSession.sessionState.conf.writeLegacyParquetFormat val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()) // !! HACK ALERT !! @@ -579,13 +560,9 @@ object ParquetFileFormat extends Logging { serializedConf.value, fakeFileStatuses, ignoreCorruptFiles) // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` - val converter = - new ParquetSchemaConverter( - assumeBinaryIsString = assumeBinaryIsString, - assumeInt96IsTimestamp = assumeInt96IsTimestamp, - writeLegacyParquetFormat = writeLegacyParquetFormat, - writeTimestampInMillis = writeTimestampInMillis) - + val converter = new ParquetToSparkSchemaConverter( + assumeBinaryIsString = assumeBinaryIsString, + assumeInt96IsTimestamp = assumeInt96IsTimestamp) if (footers.isEmpty) { Iterator.empty } else { @@ -625,7 +602,7 @@ object ParquetFileFormat extends Logging { * a [[StructType]] converted from the [[MessageType]] stored in this footer. */ def readSchemaFromFooter( - footer: Footer, converter: ParquetSchemaConverter): StructType = { + footer: Footer, converter: ParquetToSparkSchemaConverter): StructType = { val fileMetaData = footer.getParquetMetadata.getFileMetaData fileMetaData .getKeyValueMetaData diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index f1a35dd8a6..2854cb1bc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -95,7 +95,7 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo new ParquetRecordMaterializer( parquetRequestedSchema, ParquetReadSupport.expandUDT(catalystRequestedSchema), - new ParquetSchemaConverter(conf)) + new ParquetToSparkSchemaConverter(conf)) } } @@ -270,7 +270,7 @@ private[parquet] object ParquetReadSupport { private def clipParquetGroupFields( parquetRecord: GroupType, structType: StructType): Seq[Type] = { val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap - val toParquet = new ParquetSchemaConverter(writeLegacyParquetFormat = false) + val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat = false) structType.map { f => parquetFieldMap .get(f.name) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala index 4e49a0dac9..793755e9aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -31,7 +31,9 @@ import org.apache.spark.sql.types.StructType * @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters */ private[parquet] class ParquetRecordMaterializer( - parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter) + parquetSchema: MessageType, + catalystSchema: StructType, + schemaConverter: ParquetToSparkSchemaConverter) extends RecordMaterializer[UnsafeRow] { private val rootConverter = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 32e6c60cd9..10f6c3b4f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -27,7 +27,7 @@ import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type} import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8} -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64} +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64, INT96} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow @@ -120,7 +120,7 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd * @param updater An updater which propagates converted field values to the parent container */ private[parquet] class ParquetRowConverter( - schemaConverter: ParquetSchemaConverter, + schemaConverter: ParquetToSparkSchemaConverter, parquetType: GroupType, catalystType: StructType, updater: ParentContainerUpdater) @@ -252,6 +252,13 @@ private[parquet] class ParquetRowConverter( case StringType => new ParquetStringConverter(updater) + case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS => + new ParquetPrimitiveConverter(updater) { + override def addLong(value: Long): Unit = { + updater.setLong(value) + } + } + case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS => new ParquetPrimitiveConverter(updater) { override def addLong(value: Long): Unit = { @@ -259,8 +266,8 @@ private[parquet] class ParquetRowConverter( } } - case TimestampType => - // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. + // INT96 timestamp doesn't have a logical type, here we check the physical type instead. + case TimestampType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT96 => new ParquetPrimitiveConverter(updater) { // Converts nanosecond timestamps stored as INT96 override def addBinary(value: Binary): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index cd384d17d0..c61be077d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -30,49 +30,31 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ + /** - * This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]] and - * vice versa. + * This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]]. * * Parquet format backwards-compatibility rules are respected when converting Parquet * [[MessageType]] schemas. * * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md - * @constructor - * @param assumeBinaryIsString Whether unannotated BINARY fields should be assumed to be Spark SQL - * [[StringType]] fields when converting Parquet a [[MessageType]] to Spark SQL - * [[StructType]]. This argument only affects Parquet read path. - * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL - * [[TimestampType]] fields when converting Parquet a [[MessageType]] to Spark SQL - * [[StructType]]. Note that Spark SQL [[TimestampType]] is similar to Hive timestamp, which - * has optional nanosecond precision, but different from `TIME_MILLS` and `TIMESTAMP_MILLIS` - * described in Parquet format spec. This argument only affects Parquet read path. - * @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4 - * and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]]. - * When set to false, use standard format defined in parquet-format spec. This argument only - * affects Parquet write path. - * @param writeTimestampInMillis Whether to write timestamp values as INT64 annotated by logical - * type TIMESTAMP_MILLIS. * + * @param assumeBinaryIsString Whether unannotated BINARY fields should be assumed to be Spark SQL + * [[StringType]] fields. + * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL + * [[TimestampType]] fields. */ -private[parquet] class ParquetSchemaConverter( +class ParquetToSparkSchemaConverter( assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, - assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, - writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get, - writeTimestampInMillis: Boolean = SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.defaultValue.get) { + assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get) { def this(conf: SQLConf) = this( assumeBinaryIsString = conf.isParquetBinaryAsString, - assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp, - writeLegacyParquetFormat = conf.writeLegacyParquetFormat, - writeTimestampInMillis = conf.isParquetINT64AsTimestampMillis) + assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp) def this(conf: Configuration) = this( assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, - assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean, - writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, - SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean, - writeTimestampInMillis = conf.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key).toBoolean) + assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean) /** @@ -165,6 +147,7 @@ private[parquet] class ParquetSchemaConverter( case INT_64 | null => LongType case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS) case UINT_64 => typeNotSupported() + case TIMESTAMP_MICROS => TimestampType case TIMESTAMP_MILLIS => TimestampType case _ => illegalType() } @@ -310,6 +293,30 @@ private[parquet] class ParquetSchemaConverter( repeatedType.getName == s"${parentName}_tuple" } } +} + +/** + * This converter class is used to convert Spark SQL [[StructType]] to Parquet [[MessageType]]. + * + * @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4 + * and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]]. + * When set to false, use standard format defined in parquet-format spec. This argument only + * affects Parquet write path. + * @param outputTimestampType which parquet timestamp type to use when writing. + */ +class SparkToParquetSchemaConverter( + writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get, + outputTimestampType: SQLConf.ParquetOutputTimestampType.Value = + SQLConf.ParquetOutputTimestampType.INT96) { + + def this(conf: SQLConf) = this( + writeLegacyParquetFormat = conf.writeLegacyParquetFormat, + outputTimestampType = conf.parquetOutputTimestampType) + + def this(conf: Configuration) = this( + writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean, + outputTimestampType = SQLConf.ParquetOutputTimestampType.withName( + conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key))) /** * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]]. @@ -363,7 +370,9 @@ private[parquet] class ParquetSchemaConverter( case DateType => Types.primitive(INT32, repetition).as(DATE).named(field.name) - // NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet format spec. + // NOTE: Spark SQL can write timestamp values to Parquet using INT96, TIMESTAMP_MICROS or + // TIMESTAMP_MILLIS. TIMESTAMP_MICROS is recommended but INT96 is the default to keep the + // behavior same as before. // // As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond // timestamp in Impala for some historical reasons. It's not recommended to be used for any @@ -372,23 +381,18 @@ private[parquet] class ParquetSchemaConverter( // `TIMESTAMP_MICROS` which are both logical types annotating `INT64`. // // Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive. Starting - // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store - // a timestamp into a `Long`. This design decision is subject to change though, for example, - // we may resort to microsecond precision in the future. - // - // For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's - // currently not implemented yet because parquet-mr 1.8.1 (the version we're currently using) - // hasn't implemented `TIMESTAMP_MICROS` yet, however it supports TIMESTAMP_MILLIS. We will - // encode timestamp values as TIMESTAMP_MILLIS annotating INT64 if - // 'spark.sql.parquet.int64AsTimestampMillis' is set. - // - // TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that. - - case TimestampType if writeTimestampInMillis => - Types.primitive(INT64, repetition).as(TIMESTAMP_MILLIS).named(field.name) - + // from Spark 1.5.0, we resort to a timestamp type with microsecond precision so that we can + // store a timestamp into a `Long`. This design decision is subject to change though, for + // example, we may resort to nanosecond precision in the future. case TimestampType => - Types.primitive(INT96, repetition).named(field.name) + outputTimestampType match { + case SQLConf.ParquetOutputTimestampType.INT96 => + Types.primitive(INT96, repetition).named(field.name) + case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS => + Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).named(field.name) + case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS => + Types.primitive(INT64, repetition).as(TIMESTAMP_MILLIS).named(field.name) + } case BinaryType => Types.primitive(BINARY, repetition).named(field.name) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 63a8666f0d..af4e1433c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -66,8 +66,8 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit // Whether to write data in legacy Parquet format compatible with Spark 1.4 and prior versions private var writeLegacyParquetFormat: Boolean = _ - // Whether to write timestamp value with milliseconds precision. - private var writeTimestampInMillis: Boolean = _ + // Which parquet timestamp type to use when writing. + private var outputTimestampType: SQLConf.ParquetOutputTimestampType.Value = _ // Reusable byte array used to write timestamps as Parquet INT96 values private val timestampBuffer = new Array[Byte](12) @@ -84,15 +84,15 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean } - this.writeTimestampInMillis = { - assert(configuration.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key) != null) - configuration.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key).toBoolean + this.outputTimestampType = { + val key = SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key + assert(configuration.get(key) != null) + SQLConf.ParquetOutputTimestampType.withName(configuration.get(key)) } - this.rootFieldWriters = schema.map(_.dataType).map(makeWriter).toArray[ValueWriter] - val messageType = new ParquetSchemaConverter(configuration).convert(schema) + val messageType = new SparkToParquetSchemaConverter(configuration).convert(schema) val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava logInfo( @@ -163,25 +163,23 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit recordConsumer.addBinary( Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes)) - case TimestampType if writeTimestampInMillis => - (row: SpecializedGetters, ordinal: Int) => - val millis = DateTimeUtils.toMillis(row.getLong(ordinal)) - recordConsumer.addLong(millis) - case TimestampType => - (row: SpecializedGetters, ordinal: Int) => { - // TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it - // Currently we only support timestamps stored as INT96, which is compatible with Hive - // and Impala. However, INT96 is to be deprecated. We plan to support `TIMESTAMP_MICROS` - // defined in the parquet-format spec. But up until writing, the most recent parquet-mr - // version (1.8.1) hasn't implemented it yet. + outputTimestampType match { + case SQLConf.ParquetOutputTimestampType.INT96 => + (row: SpecializedGetters, ordinal: Int) => + val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal)) + val buf = ByteBuffer.wrap(timestampBuffer) + buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) + recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) - // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond - // precision. Nanosecond parts of timestamp values read from INT96 are simply stripped. - val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal)) - val buf = ByteBuffer.wrap(timestampBuffer) - buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) - recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) + case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addLong(row.getLong(ordinal)) + + case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS => + (row: SpecializedGetters, ordinal: Int) => + val millis = DateTimeUtils.toMillis(row.getLong(ordinal)) + recordConsumer.addLong(millis) } case BinaryType => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index d76990b482..633cfde6ab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -110,12 +110,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { | required binary h(DECIMAL(32,0)); | required fixed_len_byte_array(32) i(DECIMAL(32,0)); | required int64 j(TIMESTAMP_MILLIS); + | required int64 k(TIMESTAMP_MICROS); |} """.stripMargin) val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0), DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0), - TimestampType) + TimestampType, TimestampType) withTempPath { location => val path = new Path(location.getCanonicalPath) @@ -380,7 +381,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE))) assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE))) - val expectedSchema = new ParquetSchemaConverter().convert(schema) + val expectedSchema = new SparkToParquetSchemaConverter().convert(schema) val actualSchema = readFooter(path, hadoopConf).getFileMetaData.getSchema actualSchema.checkContains(expectedSchema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index e822e40b14..4c8c9ef6e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -235,6 +235,28 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } + test("SPARK-10365 timestamp written and read as INT64 - TIMESTAMP_MICROS") { + val data = (1 to 10).map { i => + val ts = new java.sql.Timestamp(i) + ts.setNanos(2000) + Row(i, ts) + } + val schema = StructType(List(StructField("d", IntegerType, false), + StructField("time", TimestampType, false)).toArray) + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MICROS") { + withTempPath { file => + val df = spark.createDataFrame(sparkContext.parallelize(data), schema) + df.write.parquet(file.getCanonicalPath) + ("true" :: "false" :: Nil).foreach { vectorized => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { + val df2 = spark.read.parquet(file.getCanonicalPath) + checkAnswer(df2, df.collect().toSeq) + } + } + } + } + } + test("Enabling/disabling merging partfiles when merging parquet schema") { def testSchemaMerging(expectedColumnNumber: Int): Unit = { withTempDir { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index ce992674d7..2cd2a600f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -24,6 +24,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -52,14 +53,10 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext { sqlSchema: StructType, parquetSchema: String, binaryAsString: Boolean, - int96AsTimestamp: Boolean, - writeLegacyParquetFormat: Boolean, - int64AsTimestampMillis: Boolean = false): Unit = { - val converter = new ParquetSchemaConverter( + int96AsTimestamp: Boolean): Unit = { + val converter = new ParquetToSparkSchemaConverter( assumeBinaryIsString = binaryAsString, - assumeInt96IsTimestamp = int96AsTimestamp, - writeLegacyParquetFormat = writeLegacyParquetFormat, - writeTimestampInMillis = int64AsTimestampMillis) + assumeInt96IsTimestamp = int96AsTimestamp) test(s"sql <= parquet: $testName") { val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema)) @@ -77,15 +74,12 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext { testName: String, sqlSchema: StructType, parquetSchema: String, - binaryAsString: Boolean, - int96AsTimestamp: Boolean, writeLegacyParquetFormat: Boolean, - int64AsTimestampMillis: Boolean = false): Unit = { - val converter = new ParquetSchemaConverter( - assumeBinaryIsString = binaryAsString, - assumeInt96IsTimestamp = int96AsTimestamp, + outputTimestampType: SQLConf.ParquetOutputTimestampType.Value = + SQLConf.ParquetOutputTimestampType.INT96): Unit = { + val converter = new SparkToParquetSchemaConverter( writeLegacyParquetFormat = writeLegacyParquetFormat, - writeTimestampInMillis = int64AsTimestampMillis) + outputTimestampType = outputTimestampType) test(s"sql => parquet: $testName") { val actual = converter.convert(sqlSchema) @@ -102,25 +96,22 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext { binaryAsString: Boolean, int96AsTimestamp: Boolean, writeLegacyParquetFormat: Boolean, - int64AsTimestampMillis: Boolean = false): Unit = { + outputTimestampType: SQLConf.ParquetOutputTimestampType.Value = + SQLConf.ParquetOutputTimestampType.INT96): Unit = { testCatalystToParquet( testName, sqlSchema, parquetSchema, - binaryAsString, - int96AsTimestamp, writeLegacyParquetFormat, - int64AsTimestampMillis) + outputTimestampType) testParquetToCatalyst( testName, sqlSchema, parquetSchema, binaryAsString, - int96AsTimestamp, - writeLegacyParquetFormat, - int64AsTimestampMillis) + int96AsTimestamp) } } @@ -411,8 +402,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true, - writeLegacyParquetFormat = true) + int96AsTimestamp = true) testParquetToCatalyst( "Backwards-compatibility: LIST with nullable element type - 2", @@ -430,8 +420,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true, - writeLegacyParquetFormat = true) + int96AsTimestamp = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 1 - standard", @@ -446,8 +435,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true, - writeLegacyParquetFormat = true) + int96AsTimestamp = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 2", @@ -462,8 +450,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true, - writeLegacyParquetFormat = true) + int96AsTimestamp = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 3", @@ -476,8 +463,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true, - writeLegacyParquetFormat = true) + int96AsTimestamp = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 4", @@ -500,8 +486,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true, - writeLegacyParquetFormat = true) + int96AsTimestamp = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 5 - parquet-avro style", @@ -522,8 +507,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true, - writeLegacyParquetFormat = true) + int96AsTimestamp = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 6 - parquet-thrift style", @@ -544,8 +528,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true, - writeLegacyParquetFormat = true) + int96AsTimestamp = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type 7 - " + @@ -557,8 +540,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true, - writeLegacyParquetFormat = true) + int96AsTimestamp = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type 8 - " + @@ -580,8 +562,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true, - writeLegacyParquetFormat = true) + int96AsTimestamp = true) // ======================================================= // Tests for converting Catalyst ArrayType to Parquet LIST @@ -602,8 +583,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin, - binaryAsString = true, - int96AsTimestamp = true, writeLegacyParquetFormat = false) testCatalystToParquet( @@ -621,8 +600,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin, - binaryAsString = true, - int96AsTimestamp = true, writeLegacyParquetFormat = true) testCatalystToParquet( @@ -640,8 +617,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin, - binaryAsString = true, - int96AsTimestamp = true, writeLegacyParquetFormat = false) testCatalystToParquet( @@ -657,8 +632,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin, - binaryAsString = true, - int96AsTimestamp = true, writeLegacyParquetFormat = true) // ==================================================== @@ -682,8 +655,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true, - writeLegacyParquetFormat = true) + int96AsTimestamp = true) testParquetToCatalyst( "Backwards-compatibility: MAP with non-nullable value type - 2", @@ -702,8 +674,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true, - writeLegacyParquetFormat = true) + int96AsTimestamp = true) testParquetToCatalyst( "Backwards-compatibility: MAP with non-nullable value type - 3 - prior to 1.4.x", @@ -722,8 +693,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true, - writeLegacyParquetFormat = true) + int96AsTimestamp = true) testParquetToCatalyst( "Backwards-compatibility: MAP with nullable value type - 1 - standard", @@ -742,8 +712,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true, - writeLegacyParquetFormat = true) + int96AsTimestamp = true) testParquetToCatalyst( "Backwards-compatibility: MAP with nullable value type - 2", @@ -762,8 +731,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true, - writeLegacyParquetFormat = true) + int96AsTimestamp = true) testParquetToCatalyst( "Backwards-compatibility: MAP with nullable value type - 3 - parquet-avro style", @@ -782,8 +750,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, binaryAsString = true, - int96AsTimestamp = true, - writeLegacyParquetFormat = true) + int96AsTimestamp = true) // ==================================================== // Tests for converting Catalyst MapType to Parquet Map @@ -805,8 +772,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin, - binaryAsString = true, - int96AsTimestamp = true, writeLegacyParquetFormat = false) testCatalystToParquet( @@ -825,8 +790,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin, - binaryAsString = true, - int96AsTimestamp = true, writeLegacyParquetFormat = true) testCatalystToParquet( @@ -845,8 +808,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin, - binaryAsString = true, - int96AsTimestamp = true, writeLegacyParquetFormat = false) testCatalystToParquet( @@ -865,8 +826,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin, - binaryAsString = true, - int96AsTimestamp = true, writeLegacyParquetFormat = true) // ================================= @@ -982,7 +941,19 @@ class ParquetSchemaSuite extends ParquetSchemaTest { binaryAsString = true, int96AsTimestamp = false, writeLegacyParquetFormat = true, - int64AsTimestampMillis = true) + outputTimestampType = SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS) + + testSchema( + "Timestamp written and read as INT64 with TIMESTAMP_MICROS", + StructType(Seq(StructField("f1", TimestampType))), + """message root { + | optional INT64 f1 (TIMESTAMP_MICROS); + |} + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = false, + writeLegacyParquetFormat = true, + outputTimestampType = SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS) private def testSchemaClipping( testName: String, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index 85efca3c4b..f05f5722af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -124,7 +124,7 @@ private[sql] trait ParquetTest extends SQLTestUtils { protected def writeMetadata( schema: StructType, path: Path, configuration: Configuration): Unit = { - val parquetSchema = new ParquetSchemaConverter().convert(schema) + val parquetSchema = new SparkToParquetSchemaConverter().convert(schema) val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schema.json).asJava val createdBy = s"Apache Spark ${org.apache.spark.SPARK_VERSION}" val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, createdBy) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index 205c303b6c..f9d75fc178 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -281,4 +281,32 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { assert(null == spark.conf.get("spark.sql.nonexistent", null)) assert("" == spark.conf.get("spark.sql.nonexistent", "")) } + + test("SPARK-10365: PARQUET_OUTPUT_TIMESTAMP_TYPE") { + spark.sessionState.conf.clear() + + // check default value + assert(spark.sessionState.conf.parquetOutputTimestampType == + SQLConf.ParquetOutputTimestampType.INT96) + + // PARQUET_INT64_AS_TIMESTAMP_MILLIS should be respected. + spark.sessionState.conf.setConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS, true) + assert(spark.sessionState.conf.parquetOutputTimestampType == + SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS) + + // PARQUET_OUTPUT_TIMESTAMP_TYPE has higher priority over PARQUET_INT64_AS_TIMESTAMP_MILLIS + spark.sessionState.conf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros") + assert(spark.sessionState.conf.parquetOutputTimestampType == + SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS) + spark.sessionState.conf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96") + assert(spark.sessionState.conf.parquetOutputTimestampType == + SQLConf.ParquetOutputTimestampType.INT96) + + // test invalid conf value + intercept[IllegalArgumentException] { + spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, "invalid") + } + + spark.sessionState.conf.clear() + } }