[SPARK-10365][SQL] Support Parquet logical type TIMESTAMP_MICROS

## What changes were proposed in this pull request?

This PR makes Spark to be able to read Parquet TIMESTAMP_MICROS values, and add a new config to allow Spark to write timestamp values to parquet as TIMESTAMP_MICROS type.

## How was this patch tested?

new test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19702 from cloud-fan/parquet.
This commit is contained in:
Wenchen Fan 2017-11-11 22:40:26 +01:00
parent d6ee69e776
commit 21a7bfd5c3
15 changed files with 249 additions and 204 deletions

View file

@ -285,8 +285,24 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
object ParquetOutputTimestampType extends Enumeration {
val INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS = Value
}
val PARQUET_OUTPUT_TIMESTAMP_TYPE = buildConf("spark.sql.parquet.outputTimestampType")
.doc("Sets which Parquet timestamp type to use when Spark writes data to Parquet files. " +
"INT96 is a non-standard but commonly used timestamp type in Parquet. TIMESTAMP_MICROS " +
"is a standard timestamp type in Parquet, which stores number of microseconds from the " +
"Unix epoch. TIMESTAMP_MILLIS is also standard, but with millisecond precision, which " +
"means Spark has to truncate the microsecond portion of its timestamp value.")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(ParquetOutputTimestampType.values.map(_.toString))
.createWithDefault(ParquetOutputTimestampType.INT96.toString)
val PARQUET_INT64_AS_TIMESTAMP_MILLIS = buildConf("spark.sql.parquet.int64AsTimestampMillis")
.doc("When true, timestamp values will be stored as INT64 with TIMESTAMP_MILLIS as the " +
.doc(s"(Deprecated since Spark 2.3, please set ${PARQUET_OUTPUT_TIMESTAMP_TYPE.key}.) " +
"When true, timestamp values will be stored as INT64 with TIMESTAMP_MILLIS as the " +
"extended type. In this mode, the microsecond portion of the timestamp value will be" +
"truncated.")
.booleanConf
@ -1143,6 +1159,18 @@ class SQLConf extends Serializable with Logging {
def isParquetINT64AsTimestampMillis: Boolean = getConf(PARQUET_INT64_AS_TIMESTAMP_MILLIS)
def parquetOutputTimestampType: ParquetOutputTimestampType.Value = {
val isOutputTimestampTypeSet = settings.containsKey(PARQUET_OUTPUT_TIMESTAMP_TYPE.key)
if (!isOutputTimestampTypeSet && isParquetINT64AsTimestampMillis) {
// If PARQUET_OUTPUT_TIMESTAMP_TYPE is not set and PARQUET_INT64_AS_TIMESTAMP_MILLIS is set,
// respect PARQUET_INT64_AS_TIMESTAMP_MILLIS and use TIMESTAMP_MILLIS. Otherwise,
// PARQUET_OUTPUT_TIMESTAMP_TYPE has higher priority.
ParquetOutputTimestampType.TIMESTAMP_MILLIS
} else {
ParquetOutputTimestampType.withName(getConf(PARQUET_OUTPUT_TIMESTAMP_TYPE))
}
}
def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)
def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)

View file

@ -197,8 +197,6 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
Configuration config = new Configuration();
config.set("spark.sql.parquet.binaryAsString", "false");
config.set("spark.sql.parquet.int96AsTimestamp", "false");
config.set("spark.sql.parquet.writeLegacyFormat", "false");
config.set("spark.sql.parquet.int64AsTimestampMillis", "false");
this.file = new Path(path);
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
@ -224,7 +222,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
this.requestedSchema = ParquetSchemaConverter.EMPTY_MESSAGE();
}
}
this.sparkSchema = new ParquetSchemaConverter(config).convert(requestedSchema);
this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema);
this.reader = new ParquetFileReader(
config, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
for (BlockMetaData block : blocks) {

View file

@ -26,6 +26,7 @@ import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.*;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
@ -91,11 +92,15 @@ public class VectorizedColumnReader {
private final PageReader pageReader;
private final ColumnDescriptor descriptor;
private final OriginalType originalType;
public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader)
throws IOException {
public VectorizedColumnReader(
ColumnDescriptor descriptor,
OriginalType originalType,
PageReader pageReader) throws IOException {
this.descriptor = descriptor;
this.pageReader = pageReader;
this.originalType = originalType;
this.maxDefLevel = descriptor.getMaxDefinitionLevel();
DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
@ -158,12 +163,12 @@ public class VectorizedColumnReader {
defColumn.readIntegers(
num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
// Timestamp values encoded as INT64 can't be lazily decoded as we need to post process
// TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we need to post process
// the values to add microseconds precision.
if (column.hasDictionary() || (rowId == 0 &&
(descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32 ||
(descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64 &&
column.dataType() != DataTypes.TimestampType) ||
originalType != OriginalType.TIMESTAMP_MILLIS) ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.BINARY))) {
@ -253,21 +258,21 @@ public class VectorizedColumnReader {
case INT64:
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType())) {
DecimalType.is64BitDecimalType(column.dataType()) ||
originalType == OriginalType.TIMESTAMP_MICROS) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
}
}
} else if (column.dataType() == DataTypes.TimestampType) {
} else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
column.putLong(i,
DateTimeUtils.fromMillis(dictionary.decodeToLong(dictionaryIds.getDictId(i))));
}
}
}
else {
} else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
}
break;
@ -377,10 +382,11 @@ public class VectorizedColumnReader {
private void readLongBatch(int rowId, int num, WritableColumnVector column) {
// This is where we implement support for the valid type conversions.
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType())) {
DecimalType.is64BitDecimalType(column.dataType()) ||
originalType == OriginalType.TIMESTAMP_MICROS) {
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (column.dataType() == DataTypes.TimestampType) {
} else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong()));

View file

@ -165,8 +165,10 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
// Columns 0,1: data columns
// Column 2: partitionValues[0]
// Column 3: partitionValues[1]
public void initBatch(MemoryMode memMode, StructType partitionColumns,
InternalRow partitionValues) {
public void initBatch(
MemoryMode memMode,
StructType partitionColumns,
InternalRow partitionValues) {
StructType batchSchema = new StructType();
for (StructField f: sparkSchema.fields()) {
batchSchema = batchSchema.add(f);
@ -281,11 +283,12 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
+ rowsReturned + " out of " + totalRowCount);
}
List<ColumnDescriptor> columns = requestedSchema.getColumns();
List<Type> types = requestedSchema.asGroupType().getFields();
columnReaders = new VectorizedColumnReader[columns.size()];
for (int i = 0; i < columns.size(); ++i) {
if (missingColumns[i]) continue;
columnReaders[i] = new VectorizedColumnReader(columns.get(i),
pages.getPageReader(columns.get(i)));
columnReaders[i] = new VectorizedColumnReader(
columns.get(i), types.get(i).getOriginalType(), pages.getPageReader(columns.get(i)));
}
totalCountLoadedSoFar += pages.getRowCount();
}

View file

@ -111,23 +111,15 @@ class ParquetFileFormat
// This metadata is only useful for detecting optional columns when pushdowning filters.
ParquetWriteSupport.setSchema(dataSchema, conf)
// Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
// and `CatalystWriteSupport` (writing actual rows to Parquet files).
conf.set(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.sessionState.conf.isParquetBinaryAsString.toString)
conf.set(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp.toString)
// Sets flags for `ParquetWriteSupport`, which converts Catalyst schema to Parquet
// schema and writes actual rows to Parquet files.
conf.set(
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
sparkSession.sessionState.conf.writeLegacyParquetFormat.toString)
conf.set(
SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key,
sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis.toString)
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
sparkSession.sessionState.conf.parquetOutputTimestampType.toString)
// Sets compression scheme
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
@ -312,16 +304,13 @@ class ParquetFileFormat
ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
// Sets flags for `CatalystSchemaConverter`
// Sets flags for `ParquetToSparkSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.sessionState.conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key,
sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis)
// Try to push down filters when filter push-down is enabled.
val pushed =
@ -428,15 +417,9 @@ object ParquetFileFormat extends Logging {
private[parquet] def readSchema(
footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {
def parseParquetSchema(schema: MessageType): StructType = {
val converter = new ParquetSchemaConverter(
sparkSession.sessionState.conf.isParquetBinaryAsString,
sparkSession.sessionState.conf.isParquetBinaryAsString,
sparkSession.sessionState.conf.writeLegacyParquetFormat,
sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis)
converter.convert(schema)
}
val converter = new ParquetToSparkSchemaConverter(
sparkSession.sessionState.conf.isParquetBinaryAsString,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
val seen = mutable.HashSet[String]()
val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
@ -447,7 +430,7 @@ object ParquetFileFormat extends Logging {
.get(ParquetReadSupport.SPARK_METADATA_KEY)
if (serializedSchema.isEmpty) {
// Falls back to Parquet schema if no Spark SQL schema found.
Some(parseParquetSchema(metadata.getSchema))
Some(converter.convert(metadata.getSchema))
} else if (!seen.contains(serializedSchema.get)) {
seen += serializedSchema.get
@ -470,7 +453,7 @@ object ParquetFileFormat extends Logging {
.map(_.asInstanceOf[StructType])
.getOrElse {
// Falls back to Parquet schema if Spark SQL schema can't be parsed.
parseParquetSchema(metadata.getSchema)
converter.convert(metadata.getSchema)
})
} else {
None
@ -538,8 +521,6 @@ object ParquetFileFormat extends Logging {
sparkSession: SparkSession): Option[StructType] = {
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
val writeTimestampInMillis = sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis
val writeLegacyParquetFormat = sparkSession.sessionState.conf.writeLegacyParquetFormat
val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())
// !! HACK ALERT !!
@ -579,13 +560,9 @@ object ParquetFileFormat extends Logging {
serializedConf.value, fakeFileStatuses, ignoreCorruptFiles)
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter =
new ParquetSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
writeLegacyParquetFormat = writeLegacyParquetFormat,
writeTimestampInMillis = writeTimestampInMillis)
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp)
if (footers.isEmpty) {
Iterator.empty
} else {
@ -625,7 +602,7 @@ object ParquetFileFormat extends Logging {
* a [[StructType]] converted from the [[MessageType]] stored in this footer.
*/
def readSchemaFromFooter(
footer: Footer, converter: ParquetSchemaConverter): StructType = {
footer: Footer, converter: ParquetToSparkSchemaConverter): StructType = {
val fileMetaData = footer.getParquetMetadata.getFileMetaData
fileMetaData
.getKeyValueMetaData

View file

@ -95,7 +95,7 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo
new ParquetRecordMaterializer(
parquetRequestedSchema,
ParquetReadSupport.expandUDT(catalystRequestedSchema),
new ParquetSchemaConverter(conf))
new ParquetToSparkSchemaConverter(conf))
}
}
@ -270,7 +270,7 @@ private[parquet] object ParquetReadSupport {
private def clipParquetGroupFields(
parquetRecord: GroupType, structType: StructType): Seq[Type] = {
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
val toParquet = new ParquetSchemaConverter(writeLegacyParquetFormat = false)
val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat = false)
structType.map { f =>
parquetFieldMap
.get(f.name)

View file

@ -31,7 +31,9 @@ import org.apache.spark.sql.types.StructType
* @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters
*/
private[parquet] class ParquetRecordMaterializer(
parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter)
parquetSchema: MessageType,
catalystSchema: StructType,
schemaConverter: ParquetToSparkSchemaConverter)
extends RecordMaterializer[UnsafeRow] {
private val rootConverter =

View file

@ -27,7 +27,7 @@ import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type}
import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64, INT96}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
@ -120,7 +120,7 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd
* @param updater An updater which propagates converted field values to the parent container
*/
private[parquet] class ParquetRowConverter(
schemaConverter: ParquetSchemaConverter,
schemaConverter: ParquetToSparkSchemaConverter,
parquetType: GroupType,
catalystType: StructType,
updater: ParentContainerUpdater)
@ -252,6 +252,13 @@ private[parquet] class ParquetRowConverter(
case StringType =>
new ParquetStringConverter(updater)
case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS =>
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
updater.setLong(value)
}
}
case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS =>
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
@ -259,8 +266,8 @@ private[parquet] class ParquetRowConverter(
}
}
case TimestampType =>
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
// INT96 timestamp doesn't have a logical type, here we check the physical type instead.
case TimestampType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT96 =>
new ParquetPrimitiveConverter(updater) {
// Converts nanosecond timestamps stored as INT96
override def addBinary(value: Binary): Unit = {

View file

@ -30,49 +30,31 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
/**
* This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]] and
* vice versa.
* This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]].
*
* Parquet format backwards-compatibility rules are respected when converting Parquet
* [[MessageType]] schemas.
*
* @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
* @constructor
* @param assumeBinaryIsString Whether unannotated BINARY fields should be assumed to be Spark SQL
* [[StringType]] fields when converting Parquet a [[MessageType]] to Spark SQL
* [[StructType]]. This argument only affects Parquet read path.
* @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL
* [[TimestampType]] fields when converting Parquet a [[MessageType]] to Spark SQL
* [[StructType]]. Note that Spark SQL [[TimestampType]] is similar to Hive timestamp, which
* has optional nanosecond precision, but different from `TIME_MILLS` and `TIMESTAMP_MILLIS`
* described in Parquet format spec. This argument only affects Parquet read path.
* @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4
* and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]].
* When set to false, use standard format defined in parquet-format spec. This argument only
* affects Parquet write path.
* @param writeTimestampInMillis Whether to write timestamp values as INT64 annotated by logical
* type TIMESTAMP_MILLIS.
*
* @param assumeBinaryIsString Whether unannotated BINARY fields should be assumed to be Spark SQL
* [[StringType]] fields.
* @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL
* [[TimestampType]] fields.
*/
private[parquet] class ParquetSchemaConverter(
class ParquetToSparkSchemaConverter(
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
writeTimestampInMillis: Boolean = SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.defaultValue.get) {
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get) {
def this(conf: SQLConf) = this(
assumeBinaryIsString = conf.isParquetBinaryAsString,
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
writeLegacyParquetFormat = conf.writeLegacyParquetFormat,
writeTimestampInMillis = conf.isParquetINT64AsTimestampMillis)
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp)
def this(conf: Configuration) = this(
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean,
writeTimestampInMillis = conf.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key).toBoolean)
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean)
/**
@ -165,6 +147,7 @@ private[parquet] class ParquetSchemaConverter(
case INT_64 | null => LongType
case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS)
case UINT_64 => typeNotSupported()
case TIMESTAMP_MICROS => TimestampType
case TIMESTAMP_MILLIS => TimestampType
case _ => illegalType()
}
@ -310,6 +293,30 @@ private[parquet] class ParquetSchemaConverter(
repeatedType.getName == s"${parentName}_tuple"
}
}
}
/**
* This converter class is used to convert Spark SQL [[StructType]] to Parquet [[MessageType]].
*
* @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4
* and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]].
* When set to false, use standard format defined in parquet-format spec. This argument only
* affects Parquet write path.
* @param outputTimestampType which parquet timestamp type to use when writing.
*/
class SparkToParquetSchemaConverter(
writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
SQLConf.ParquetOutputTimestampType.INT96) {
def this(conf: SQLConf) = this(
writeLegacyParquetFormat = conf.writeLegacyParquetFormat,
outputTimestampType = conf.parquetOutputTimestampType)
def this(conf: Configuration) = this(
writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean,
outputTimestampType = SQLConf.ParquetOutputTimestampType.withName(
conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)))
/**
* Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]].
@ -363,7 +370,9 @@ private[parquet] class ParquetSchemaConverter(
case DateType =>
Types.primitive(INT32, repetition).as(DATE).named(field.name)
// NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet format spec.
// NOTE: Spark SQL can write timestamp values to Parquet using INT96, TIMESTAMP_MICROS or
// TIMESTAMP_MILLIS. TIMESTAMP_MICROS is recommended but INT96 is the default to keep the
// behavior same as before.
//
// As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond
// timestamp in Impala for some historical reasons. It's not recommended to be used for any
@ -372,23 +381,18 @@ private[parquet] class ParquetSchemaConverter(
// `TIMESTAMP_MICROS` which are both logical types annotating `INT64`.
//
// Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive. Starting
// from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store
// a timestamp into a `Long`. This design decision is subject to change though, for example,
// we may resort to microsecond precision in the future.
//
// For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's
// currently not implemented yet because parquet-mr 1.8.1 (the version we're currently using)
// hasn't implemented `TIMESTAMP_MICROS` yet, however it supports TIMESTAMP_MILLIS. We will
// encode timestamp values as TIMESTAMP_MILLIS annotating INT64 if
// 'spark.sql.parquet.int64AsTimestampMillis' is set.
//
// TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that.
case TimestampType if writeTimestampInMillis =>
Types.primitive(INT64, repetition).as(TIMESTAMP_MILLIS).named(field.name)
// from Spark 1.5.0, we resort to a timestamp type with microsecond precision so that we can
// store a timestamp into a `Long`. This design decision is subject to change though, for
// example, we may resort to nanosecond precision in the future.
case TimestampType =>
Types.primitive(INT96, repetition).named(field.name)
outputTimestampType match {
case SQLConf.ParquetOutputTimestampType.INT96 =>
Types.primitive(INT96, repetition).named(field.name)
case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).named(field.name)
case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
Types.primitive(INT64, repetition).as(TIMESTAMP_MILLIS).named(field.name)
}
case BinaryType =>
Types.primitive(BINARY, repetition).named(field.name)

View file

@ -66,8 +66,8 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
// Whether to write data in legacy Parquet format compatible with Spark 1.4 and prior versions
private var writeLegacyParquetFormat: Boolean = _
// Whether to write timestamp value with milliseconds precision.
private var writeTimestampInMillis: Boolean = _
// Which parquet timestamp type to use when writing.
private var outputTimestampType: SQLConf.ParquetOutputTimestampType.Value = _
// Reusable byte array used to write timestamps as Parquet INT96 values
private val timestampBuffer = new Array[Byte](12)
@ -84,15 +84,15 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean
}
this.writeTimestampInMillis = {
assert(configuration.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key) != null)
configuration.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key).toBoolean
this.outputTimestampType = {
val key = SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key
assert(configuration.get(key) != null)
SQLConf.ParquetOutputTimestampType.withName(configuration.get(key))
}
this.rootFieldWriters = schema.map(_.dataType).map(makeWriter).toArray[ValueWriter]
val messageType = new ParquetSchemaConverter(configuration).convert(schema)
val messageType = new SparkToParquetSchemaConverter(configuration).convert(schema)
val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava
logInfo(
@ -163,25 +163,23 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
recordConsumer.addBinary(
Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes))
case TimestampType if writeTimestampInMillis =>
(row: SpecializedGetters, ordinal: Int) =>
val millis = DateTimeUtils.toMillis(row.getLong(ordinal))
recordConsumer.addLong(millis)
case TimestampType =>
(row: SpecializedGetters, ordinal: Int) => {
// TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it
// Currently we only support timestamps stored as INT96, which is compatible with Hive
// and Impala. However, INT96 is to be deprecated. We plan to support `TIMESTAMP_MICROS`
// defined in the parquet-format spec. But up until writing, the most recent parquet-mr
// version (1.8.1) hasn't implemented it yet.
outputTimestampType match {
case SQLConf.ParquetOutputTimestampType.INT96 =>
(row: SpecializedGetters, ordinal: Int) =>
val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
val buf = ByteBuffer.wrap(timestampBuffer)
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
// NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond
// precision. Nanosecond parts of timestamp values read from INT96 are simply stripped.
val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
val buf = ByteBuffer.wrap(timestampBuffer)
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addLong(row.getLong(ordinal))
case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
(row: SpecializedGetters, ordinal: Int) =>
val millis = DateTimeUtils.toMillis(row.getLong(ordinal))
recordConsumer.addLong(millis)
}
case BinaryType =>

View file

@ -110,12 +110,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
| required binary h(DECIMAL(32,0));
| required fixed_len_byte_array(32) i(DECIMAL(32,0));
| required int64 j(TIMESTAMP_MILLIS);
| required int64 k(TIMESTAMP_MICROS);
|}
""".stripMargin)
val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0),
DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0),
TimestampType)
TimestampType, TimestampType)
withTempPath { location =>
val path = new Path(location.getCanonicalPath)
@ -380,7 +381,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
val expectedSchema = new ParquetSchemaConverter().convert(schema)
val expectedSchema = new SparkToParquetSchemaConverter().convert(schema)
val actualSchema = readFooter(path, hadoopConf).getFileMetaData.getSchema
actualSchema.checkContains(expectedSchema)

View file

@ -235,6 +235,28 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
test("SPARK-10365 timestamp written and read as INT64 - TIMESTAMP_MICROS") {
val data = (1 to 10).map { i =>
val ts = new java.sql.Timestamp(i)
ts.setNanos(2000)
Row(i, ts)
}
val schema = StructType(List(StructField("d", IntegerType, false),
StructField("time", TimestampType, false)).toArray)
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MICROS") {
withTempPath { file =>
val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
df.write.parquet(file.getCanonicalPath)
("true" :: "false" :: Nil).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
val df2 = spark.read.parquet(file.getCanonicalPath)
checkAnswer(df2, df.collect().toSeq)
}
}
}
}
}
test("Enabling/disabling merging partfiles when merging parquet schema") {
def testSchemaMerging(expectedColumnNumber: Int): Unit = {
withTempDir { dir =>

View file

@ -24,6 +24,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser}
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@ -52,14 +53,10 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
sqlSchema: StructType,
parquetSchema: String,
binaryAsString: Boolean,
int96AsTimestamp: Boolean,
writeLegacyParquetFormat: Boolean,
int64AsTimestampMillis: Boolean = false): Unit = {
val converter = new ParquetSchemaConverter(
int96AsTimestamp: Boolean): Unit = {
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = binaryAsString,
assumeInt96IsTimestamp = int96AsTimestamp,
writeLegacyParquetFormat = writeLegacyParquetFormat,
writeTimestampInMillis = int64AsTimestampMillis)
assumeInt96IsTimestamp = int96AsTimestamp)
test(s"sql <= parquet: $testName") {
val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema))
@ -77,15 +74,12 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
testName: String,
sqlSchema: StructType,
parquetSchema: String,
binaryAsString: Boolean,
int96AsTimestamp: Boolean,
writeLegacyParquetFormat: Boolean,
int64AsTimestampMillis: Boolean = false): Unit = {
val converter = new ParquetSchemaConverter(
assumeBinaryIsString = binaryAsString,
assumeInt96IsTimestamp = int96AsTimestamp,
outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
SQLConf.ParquetOutputTimestampType.INT96): Unit = {
val converter = new SparkToParquetSchemaConverter(
writeLegacyParquetFormat = writeLegacyParquetFormat,
writeTimestampInMillis = int64AsTimestampMillis)
outputTimestampType = outputTimestampType)
test(s"sql => parquet: $testName") {
val actual = converter.convert(sqlSchema)
@ -102,25 +96,22 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
binaryAsString: Boolean,
int96AsTimestamp: Boolean,
writeLegacyParquetFormat: Boolean,
int64AsTimestampMillis: Boolean = false): Unit = {
outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
SQLConf.ParquetOutputTimestampType.INT96): Unit = {
testCatalystToParquet(
testName,
sqlSchema,
parquetSchema,
binaryAsString,
int96AsTimestamp,
writeLegacyParquetFormat,
int64AsTimestampMillis)
outputTimestampType)
testParquetToCatalyst(
testName,
sqlSchema,
parquetSchema,
binaryAsString,
int96AsTimestamp,
writeLegacyParquetFormat,
int64AsTimestampMillis)
int96AsTimestamp)
}
}
@ -411,8 +402,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with nullable element type - 2",
@ -430,8 +420,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 1 - standard",
@ -446,8 +435,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 2",
@ -462,8 +450,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 3",
@ -476,8 +463,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 4",
@ -500,8 +486,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 5 - parquet-avro style",
@ -522,8 +507,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 6 - parquet-thrift style",
@ -544,8 +528,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type 7 - " +
@ -557,8 +540,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type 8 - " +
@ -580,8 +562,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
int96AsTimestamp = true)
// =======================================================
// Tests for converting Catalyst ArrayType to Parquet LIST
@ -602,8 +583,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = false)
testCatalystToParquet(
@ -621,8 +600,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
testCatalystToParquet(
@ -640,8 +617,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = false)
testCatalystToParquet(
@ -657,8 +632,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
// ====================================================
@ -682,8 +655,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: MAP with non-nullable value type - 2",
@ -702,8 +674,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: MAP with non-nullable value type - 3 - prior to 1.4.x",
@ -722,8 +693,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: MAP with nullable value type - 1 - standard",
@ -742,8 +712,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: MAP with nullable value type - 2",
@ -762,8 +731,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: MAP with nullable value type - 3 - parquet-avro style",
@ -782,8 +750,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
int96AsTimestamp = true)
// ====================================================
// Tests for converting Catalyst MapType to Parquet Map
@ -805,8 +772,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = false)
testCatalystToParquet(
@ -825,8 +790,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
testCatalystToParquet(
@ -845,8 +808,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = false)
testCatalystToParquet(
@ -865,8 +826,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
// =================================
@ -982,7 +941,19 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
binaryAsString = true,
int96AsTimestamp = false,
writeLegacyParquetFormat = true,
int64AsTimestampMillis = true)
outputTimestampType = SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS)
testSchema(
"Timestamp written and read as INT64 with TIMESTAMP_MICROS",
StructType(Seq(StructField("f1", TimestampType))),
"""message root {
| optional INT64 f1 (TIMESTAMP_MICROS);
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = false,
writeLegacyParquetFormat = true,
outputTimestampType = SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS)
private def testSchemaClipping(
testName: String,

View file

@ -124,7 +124,7 @@ private[sql] trait ParquetTest extends SQLTestUtils {
protected def writeMetadata(
schema: StructType, path: Path, configuration: Configuration): Unit = {
val parquetSchema = new ParquetSchemaConverter().convert(schema)
val parquetSchema = new SparkToParquetSchemaConverter().convert(schema)
val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schema.json).asJava
val createdBy = s"Apache Spark ${org.apache.spark.SPARK_VERSION}"
val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, createdBy)

View file

@ -281,4 +281,32 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
assert(null == spark.conf.get("spark.sql.nonexistent", null))
assert("<undefined>" == spark.conf.get("spark.sql.nonexistent", "<undefined>"))
}
test("SPARK-10365: PARQUET_OUTPUT_TIMESTAMP_TYPE") {
spark.sessionState.conf.clear()
// check default value
assert(spark.sessionState.conf.parquetOutputTimestampType ==
SQLConf.ParquetOutputTimestampType.INT96)
// PARQUET_INT64_AS_TIMESTAMP_MILLIS should be respected.
spark.sessionState.conf.setConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS, true)
assert(spark.sessionState.conf.parquetOutputTimestampType ==
SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS)
// PARQUET_OUTPUT_TIMESTAMP_TYPE has higher priority over PARQUET_INT64_AS_TIMESTAMP_MILLIS
spark.sessionState.conf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros")
assert(spark.sessionState.conf.parquetOutputTimestampType ==
SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS)
spark.sessionState.conf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96")
assert(spark.sessionState.conf.parquetOutputTimestampType ==
SQLConf.ParquetOutputTimestampType.INT96)
// test invalid conf value
intercept[IllegalArgumentException] {
spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, "invalid")
}
spark.sessionState.conf.clear()
}
}