[SPARK-34817][SQL] Read parquet unsigned types that stored as int32 physical type in parquet

### What changes were proposed in this pull request?

Unsigned types may be used to produce smaller in-memory representations of the data. These types used by frameworks(e.g. hive, pig) using parquet. And parquet will map them to its base types.

see more https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift

```thrift
  /**
   * An unsigned integer value.
   *
   * The number describes the maximum number of meaningful data bits in
   * the stored value. 8, 16 and 32 bit values are stored using the
   * INT32 physical type.  64 bit values are stored using the INT64
   * physical type.
   *
   */
  UINT_8 = 11;
  UINT_16 = 12;
  UINT_32 = 13;
  UINT_64 = 14;
```

```
UInt8-[0:255]
UInt16-[0:65535]
UInt32-[0:4294967295]
UInt64-[0:18446744073709551615]
```

In this PR, we support read UINT_8 as ShortType, UINT_16 as IntegerType, UINT_32 as LongType to fit their range. Support for UINT_64 will be in another PR.

### Why are the changes needed?

better parquet support

### Does this PR introduce _any_ user-facing change?

yes, we can read unit[8/16/32] from parquet files

### How was this patch tested?

new tests

Closes #31921 from yaooqinn/SPARK-34817.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Kent Yao 2021-03-25 06:58:06 +00:00 committed by Wenchen Fan
parent da04f1f4f8
commit 8c6748f691
8 changed files with 176 additions and 68 deletions

View file

@ -21,16 +21,16 @@ import org.apache.spark.sql.execution.vectorized.Dictionary;
public final class ParquetDictionary implements Dictionary {
private org.apache.parquet.column.Dictionary dictionary;
private boolean castLongToInt = false;
private boolean needTransform = false;
public ParquetDictionary(org.apache.parquet.column.Dictionary dictionary, boolean castLongToInt) {
public ParquetDictionary(org.apache.parquet.column.Dictionary dictionary, boolean needTransform) {
this.dictionary = dictionary;
this.castLongToInt = castLongToInt;
this.needTransform = needTransform;
}
@Override
public int decodeToInt(int id) {
if (castLongToInt) {
if (needTransform) {
return (int) dictionary.decodeToLong(id);
} else {
return dictionary.decodeToInt(id);
@ -39,7 +39,14 @@ public final class ParquetDictionary implements Dictionary {
@Override
public long decodeToLong(int id) {
return dictionary.decodeToLong(id);
if (needTransform) {
// For unsigned int32, it stores as dictionary encoded signed int32 in Parquet
// whenever dictionary is available.
// Here we lazily decode it to the original signed int value then convert to long(unit32).
return Integer.toUnsignedLong(dictionary.decodeToInt(id));
} else {
return dictionary.decodeToLong(id);
}
}
@Override

View file

@ -46,7 +46,6 @@ import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.DecimalType;
import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.ValuesReaderIntIterator;
import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.createRLEIterator;
@ -279,16 +278,20 @@ public class VectorizedColumnReader {
// We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
// non-dictionary encoded values have already been added).
PrimitiveType primitiveType = descriptor.getPrimitiveType();
if (primitiveType.getOriginalType() == OriginalType.DECIMAL &&
primitiveType.getDecimalMetadata().getPrecision() <= Decimal.MAX_INT_DIGITS() &&
primitiveType.getPrimitiveTypeName() == INT64) {
// We need to make sure that we initialize the right type for the dictionary otherwise
// WritableColumnVector will throw an exception when trying to decode to an Int when the
// dictionary is in fact initialized as Long
column.setDictionary(new ParquetDictionary(dictionary, true));
} else {
column.setDictionary(new ParquetDictionary(dictionary, false));
}
// We need to make sure that we initialize the right type for the dictionary otherwise
// WritableColumnVector will throw an exception when trying to decode to an Int when the
// dictionary is in fact initialized as Long
boolean castLongToInt = primitiveType.getOriginalType() == OriginalType.DECIMAL &&
primitiveType.getDecimalMetadata().getPrecision() <= Decimal.MAX_INT_DIGITS() &&
primitiveType.getPrimitiveTypeName() == INT64;
// We require a long value, but we need to use dictionary to decode the original
// signed int first
boolean isUnsignedInt32 = primitiveType.getOriginalType() == OriginalType.UINT_32;
column.setDictionary(
new ParquetDictionary(dictionary, castLongToInt || isUnsignedInt32));
} else {
decodeDictionaryIds(rowId, num, column, dictionaryIds);
}
@ -370,6 +373,18 @@ public class VectorizedColumnReader {
column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i)));
}
}
} else if (column.dataType() == DataTypes.LongType) {
// In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our LongType.
// For unsigned int32, it stores as dictionary encoded signed int32 in Parquet
// whenever dictionary is available.
// Here we eagerly decode it to the original signed int value then convert to
// long(unit32).
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
column.putLong(i,
Integer.toUnsignedLong(dictionary.decodeToInt(dictionaryIds.getDictId(i))));
}
}
} else if (column.dataType() == DataTypes.ByteType) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
@ -565,6 +580,12 @@ public class VectorizedColumnReader {
canReadAsIntDecimal(column.dataType())) {
defColumn.readIntegers(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (column.dataType() == DataTypes.LongType) {
// In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our LongType.
// For unsigned int32, it stores as plain signed int32 in Parquet when dictionary fall backs.
// We read them as long values.
defColumn.readUnsignedIntegers(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (column.dataType() == DataTypes.ByteType) {
defColumn.readBytes(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);

View file

@ -83,6 +83,15 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
}
}
@Override
public final void readUnsignedIntegers(int total, WritableColumnVector c, int rowId) {
int requiredBytes = total * 4;
ByteBuffer buffer = getBuffer(requiredBytes);
for (int i = 0; i < total; i += 1) {
c.putLong(rowId + i, Integer.toUnsignedLong(buffer.getInt()));
}
}
// A fork of `readIntegers` to rebase the date values. For performance reasons, this method
// iterates the values twice: check if we need to rebase first, then go to the optimized branch
// if rebase is not needed.

View file

@ -203,6 +203,41 @@ public final class VectorizedRleValuesReader extends ValuesReader
}
}
// A fork of `readIntegers`, reading the signed integers as unsigned in long type
public void readUnsignedIntegers(
int total,
WritableColumnVector c,
int rowId,
int level,
VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
int n = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
if (currentValue == level) {
data.readUnsignedIntegers(n, c, rowId);
} else {
c.putNulls(rowId, n);
}
break;
case PACKED:
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putLong(rowId + i, Integer.toUnsignedLong(data.readInteger()));
} else {
c.putNull(rowId + i);
}
}
break;
}
rowId += n;
left -= n;
currentCount -= n;
}
}
// A fork of `readIntegers`, which rebases the date int value (days) before filling
// the Spark column vector.
public void readIntegersWithRebase(
@ -602,6 +637,11 @@ public final class VectorizedRleValuesReader extends ValuesReader
}
}
@Override
public void readUnsignedIntegers(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException("only readInts is valid.");
}
@Override
public void readIntegersWithRebase(
int total, WritableColumnVector c, int rowId, boolean failIfRebase) {

View file

@ -41,6 +41,7 @@ public interface VectorizedValuesReader {
void readBytes(int total, WritableColumnVector c, int rowId);
void readIntegers(int total, WritableColumnVector c, int rowId);
void readIntegersWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase);
void readUnsignedIntegers(int total, WritableColumnVector c, int rowId);
void readLongs(int total, WritableColumnVector c, int rowId);
void readLongsWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase);
void readFloats(int total, WritableColumnVector c, int rowId);

View file

@ -252,6 +252,11 @@ private[parquet] class ParquetRowConverter(
updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = {
catalystType match {
case LongType if parquetType.getOriginalType == OriginalType.UINT_32 =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit =
updater.setLong(Integer.toUnsignedLong(value))
}
case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType =>
new ParquetPrimitiveConverter(updater)

View file

@ -131,13 +131,11 @@ class ParquetToSparkSchemaConverter(
case INT32 =>
originalType match {
case INT_8 => ByteType
case INT_16 => ShortType
case INT_32 | null => IntegerType
case INT_16 | UINT_8 => ShortType
case INT_32 | UINT_16 | null => IntegerType
case DATE => DateType
case DECIMAL => makeDecimalType(Decimal.MAX_INT_DIGITS)
case UINT_8 => typeNotSupported()
case UINT_16 => typeNotSupported()
case UINT_32 => typeNotSupported()
case UINT_32 => LongType
case TIME_MILLIS => typeNotImplemented()
case _ => illegalType()
}

View file

@ -24,17 +24,16 @@ import scala.collection.mutable
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.parquet.column.{Encoding, ParquetProperties}
import org.apache.parquet.example.data.{Group, GroupWriter}
import org.apache.parquet.example.data.simple.SimpleGroup
import org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0
import org.apache.parquet.example.data.Group
import org.apache.parquet.example.data.simple.{SimpleGroup, SimpleGroupFactory}
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
import org.apache.parquet.hadoop.example.ExampleParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
@ -49,26 +48,6 @@ import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
// with an empty configuration (it is after all not intended to be used in this way?)
// and members are private so we need to make our own in order to pass the schema
// to the writer.
private[parquet] class TestGroupWriteSupport(schema: MessageType) extends WriteSupport[Group] {
var groupWriter: GroupWriter = null
override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
groupWriter = new GroupWriter(recordConsumer, schema)
}
override def init(configuration: Configuration): WriteContext = {
new WriteContext(schema, new java.util.HashMap[String, String]())
}
override def write(record: Group): Unit = {
groupWriter.write(record)
}
}
/**
* A test suite that tests basic Parquet I/O.
*/
@ -310,21 +289,23 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}
test("SPARK-10113 Support for unsigned Parquet logical types") {
test("SPARK-34817: Support for unsigned Parquet logical types") {
val parquetSchema = MessageTypeParser.parseMessageType(
"""message root {
| required int32 c(UINT_32);
| required INT32 a(UINT_8);
| required INT32 b(UINT_16);
| required INT32 c(UINT_32);
|}
""".stripMargin)
val expectedSparkTypes = Seq(ShortType, IntegerType, LongType)
withTempPath { location =>
val path = new Path(location.getCanonicalPath)
val conf = spark.sessionState.newHadoopConf()
writeMetadata(parquetSchema, path, conf)
val errorMessage = intercept[Throwable] {
spark.read.parquet(path.toString).printSchema()
}.toString
assert(errorMessage.contains("Parquet type not supported"))
val sparkTypes = spark.read.parquet(path.toString).schema.map(_.dataType)
assert(sparkTypes === expectedSparkTypes)
}
}
@ -381,9 +362,27 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
checkCompressionCodec(CompressionCodecName.SNAPPY)
}
private def createParquetWriter(
schema: MessageType,
path: Path,
dictionaryEnabled: Boolean = false): ParquetWriter[Group] = {
val hadoopConf = spark.sessionState.newHadoopConf()
ExampleParquetWriter
.builder(path)
.withDictionaryEncoding(dictionaryEnabled)
.withType(schema)
.withWriterVersion(PARQUET_1_0)
.withCompressionCodec(GZIP)
.withRowGroupSize(1024 * 1024)
.withPageSize(1024)
.withConf(hadoopConf)
.build()
}
test("read raw Parquet file") {
def makeRawParquetFile(path: Path): Unit = {
val schema = MessageTypeParser.parseMessageType(
val schemaStr =
"""
|message root {
| required boolean _1;
@ -392,22 +391,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
| required float _4;
| required double _5;
|}
""".stripMargin)
""".stripMargin
val schema = MessageTypeParser.parseMessageType(schemaStr)
val testWriteSupport = new TestGroupWriteSupport(schema)
/**
* Provide a builder for constructing a parquet writer - after PARQUET-248 directly
* constructing the writer is deprecated and should be done through a builder. The default
* builders include Avro - but for raw Parquet writing we must create our own builder.
*/
class ParquetWriterBuilder() extends
ParquetWriter.Builder[Group, ParquetWriterBuilder](path) {
override def getWriteSupport(conf: Configuration) = testWriteSupport
override def self() = this
}
val writer = new ParquetWriterBuilder().build()
val writer = createParquetWriter(schema, path)
(0 until 10).foreach { i =>
val record = new SimpleGroup(schema)
@ -432,6 +420,45 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}
test("SPARK-34817: Read UINT_8/UINT_16/UINT_32 from parquet") {
Seq(true, false).foreach { dictionaryEnabled =>
def makeRawParquetFile(path: Path): Unit = {
val schemaStr =
"""message root {
| required INT32 a(UINT_8);
| required INT32 b(UINT_16);
| required INT32 c(UINT_32);
|}
""".stripMargin
val schema = MessageTypeParser.parseMessageType(schemaStr)
val writer = createParquetWriter(schema, path, dictionaryEnabled)
val factory = new SimpleGroupFactory(schema)
(0 until 1000).foreach { i =>
val group = factory.newGroup()
.append("a", i % 100 + Byte.MaxValue)
.append("b", i % 100 + Short.MaxValue)
.append("c", i % 100 + Int.MaxValue)
writer.write(group)
}
writer.close()
}
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "part-r-0.parquet")
makeRawParquetFile(path)
readParquetFile(path.toString) { df =>
checkAnswer(df, (0 until 1000).map { i =>
Row(i % 100 + Byte.MaxValue,
i % 100 + Short.MaxValue,
i % 100 + Int.MaxValue.toLong)
})
}
}
}
}
test("write metadata") {
val hadoopConf = spark.sessionState.newHadoopConf()
withTempPath { file =>