[SPARK-9340] [SQL] Fixes converting unannotated Parquet lists

This PR is inspired by #8063 authored by dguy. Especially, testing Parquet files added here are all taken from that PR.

**Committer who merges this PR should attribute it to "Damian Guy <damian.guygmail.com>".**

----

SPARK-6776 and SPARK-6777 followed `parquet-avro` to implement backwards-compatibility rules defined in `parquet-format` spec. However, both Spark SQL and `parquet-avro` neglected the following statement in `parquet-format`:

> This does not affect repeated fields that are not annotated: A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor annotated by `LIST` or `MAP` should be interpreted as a required list of required elements where the element type is the type of the field.

One of the consequences is that, Parquet files generated by `parquet-protobuf` containing unannotated repeated fields are not correctly converted to Catalyst arrays.

This PR fixes this issue by

1. Handling unannotated repeated fields in `CatalystSchemaConverter`.
2. Converting this kind of special repeated fields to Catalyst arrays in `CatalystRowConverter`.

   Two special converters, `RepeatedPrimitiveConverter` and `RepeatedGroupConverter`, are added. They delegate actual conversion work to a child `elementConverter` and accumulates elements in an `ArrayBuffer`.

   Two extra methods, `start()` and `end()`, are added to `ParentContainerUpdater`. So that they can be used to initialize new `ArrayBuffer`s for unannotated repeated fields, and propagate converted array values to upstream.

Author: Cheng Lian <lian@databricks.com>

Closes #8070 from liancheng/spark-9340/unannotated-parquet-list and squashes the following commits:

ace6df7 [Cheng Lian] Moves ParquetProtobufCompatibilitySuite
f1c7bfd [Cheng Lian] Updates .rat-excludes
420ad2b [Cheng Lian] Fixes converting unannotated Parquet lists
This commit is contained in:
Damian Guy 2015-08-11 12:46:33 +08:00 committed by Cheng Lian
parent 3c9802d940
commit 071bbad5db
14 changed files with 247 additions and 33 deletions

View file

@ -94,3 +94,4 @@ INDEX
gen-java.* gen-java.*
.*avpr .*avpr
org.apache.spark.sql.sources.DataSourceRegister org.apache.spark.sql.sources.DataSourceRegister
.*parquet

View file

@ -21,11 +21,11 @@ import java.math.{BigDecimal, BigInteger}
import java.nio.ByteOrder import java.nio.ByteOrder
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import org.apache.parquet.column.Dictionary import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.OriginalType.LIST
import org.apache.parquet.schema.Type.Repetition import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{GroupType, PrimitiveType, Type} import org.apache.parquet.schema.{GroupType, PrimitiveType, Type}
@ -42,6 +42,12 @@ import org.apache.spark.unsafe.types.UTF8String
* values to an [[ArrayBuffer]]. * values to an [[ArrayBuffer]].
*/ */
private[parquet] trait ParentContainerUpdater { private[parquet] trait ParentContainerUpdater {
/** Called before a record field is being converted */
def start(): Unit = ()
/** Called after a record field is being converted */
def end(): Unit = ()
def set(value: Any): Unit = () def set(value: Any): Unit = ()
def setBoolean(value: Boolean): Unit = set(value) def setBoolean(value: Boolean): Unit = set(value)
def setByte(value: Byte): Unit = set(value) def setByte(value: Byte): Unit = set(value)
@ -55,6 +61,32 @@ private[parquet] trait ParentContainerUpdater {
/** A no-op updater used for root converter (who doesn't have a parent). */ /** A no-op updater used for root converter (who doesn't have a parent). */
private[parquet] object NoopUpdater extends ParentContainerUpdater private[parquet] object NoopUpdater extends ParentContainerUpdater
private[parquet] trait HasParentContainerUpdater {
def updater: ParentContainerUpdater
}
/**
* A convenient converter class for Parquet group types with an [[HasParentContainerUpdater]].
*/
private[parquet] abstract class CatalystGroupConverter(val updater: ParentContainerUpdater)
extends GroupConverter with HasParentContainerUpdater
/**
* Parquet converter for Parquet primitive types. Note that not all Spark SQL atomic types
* are handled by this converter. Parquet primitive types are only a subset of those of Spark
* SQL. For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet.
*/
private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUpdater)
extends PrimitiveConverter with HasParentContainerUpdater {
override def addBoolean(value: Boolean): Unit = updater.setBoolean(value)
override def addInt(value: Int): Unit = updater.setInt(value)
override def addLong(value: Long): Unit = updater.setLong(value)
override def addFloat(value: Float): Unit = updater.setFloat(value)
override def addDouble(value: Double): Unit = updater.setDouble(value)
override def addBinary(value: Binary): Unit = updater.set(value.getBytes)
}
/** /**
* A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark SQL [[InternalRow]]s. * A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark SQL [[InternalRow]]s.
* Since any Parquet record is also a struct, this converter can also be used as root converter. * Since any Parquet record is also a struct, this converter can also be used as root converter.
@ -70,7 +102,7 @@ private[parquet] class CatalystRowConverter(
parquetType: GroupType, parquetType: GroupType,
catalystType: StructType, catalystType: StructType,
updater: ParentContainerUpdater) updater: ParentContainerUpdater)
extends GroupConverter { extends CatalystGroupConverter(updater) {
/** /**
* Updater used together with field converters within a [[CatalystRowConverter]]. It propagates * Updater used together with field converters within a [[CatalystRowConverter]]. It propagates
@ -89,13 +121,11 @@ private[parquet] class CatalystRowConverter(
/** /**
* Represents the converted row object once an entire Parquet record is converted. * Represents the converted row object once an entire Parquet record is converted.
*
* @todo Uses [[UnsafeRow]] for better performance.
*/ */
val currentRow = new SpecificMutableRow(catalystType.map(_.dataType)) val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
// Converters for each field. // Converters for each field.
private val fieldConverters: Array[Converter] = { private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
parquetType.getFields.zip(catalystType).zipWithIndex.map { parquetType.getFields.zip(catalystType).zipWithIndex.map {
case ((parquetFieldType, catalystField), ordinal) => case ((parquetFieldType, catalystField), ordinal) =>
// Converted field value should be set to the `ordinal`-th cell of `currentRow` // Converted field value should be set to the `ordinal`-th cell of `currentRow`
@ -105,11 +135,19 @@ private[parquet] class CatalystRowConverter(
override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex) override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex)
override def end(): Unit = updater.set(currentRow) override def end(): Unit = {
var i = 0
while (i < currentRow.numFields) {
fieldConverters(i).updater.end()
i += 1
}
updater.set(currentRow)
}
override def start(): Unit = { override def start(): Unit = {
var i = 0 var i = 0
while (i < currentRow.numFields) { while (i < currentRow.numFields) {
fieldConverters(i).updater.start()
currentRow.setNullAt(i) currentRow.setNullAt(i)
i += 1 i += 1
} }
@ -122,20 +160,20 @@ private[parquet] class CatalystRowConverter(
private def newConverter( private def newConverter(
parquetType: Type, parquetType: Type,
catalystType: DataType, catalystType: DataType,
updater: ParentContainerUpdater): Converter = { updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = {
catalystType match { catalystType match {
case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType => case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType =>
new CatalystPrimitiveConverter(updater) new CatalystPrimitiveConverter(updater)
case ByteType => case ByteType =>
new PrimitiveConverter { new CatalystPrimitiveConverter(updater) {
override def addInt(value: Int): Unit = override def addInt(value: Int): Unit =
updater.setByte(value.asInstanceOf[ByteType#InternalType]) updater.setByte(value.asInstanceOf[ByteType#InternalType])
} }
case ShortType => case ShortType =>
new PrimitiveConverter { new CatalystPrimitiveConverter(updater) {
override def addInt(value: Int): Unit = override def addInt(value: Int): Unit =
updater.setShort(value.asInstanceOf[ShortType#InternalType]) updater.setShort(value.asInstanceOf[ShortType#InternalType])
} }
@ -148,7 +186,7 @@ private[parquet] class CatalystRowConverter(
case TimestampType => case TimestampType =>
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
new PrimitiveConverter { new CatalystPrimitiveConverter(updater) {
// Converts nanosecond timestamps stored as INT96 // Converts nanosecond timestamps stored as INT96
override def addBinary(value: Binary): Unit = { override def addBinary(value: Binary): Unit = {
assert( assert(
@ -164,13 +202,23 @@ private[parquet] class CatalystRowConverter(
} }
case DateType => case DateType =>
new PrimitiveConverter { new CatalystPrimitiveConverter(updater) {
override def addInt(value: Int): Unit = { override def addInt(value: Int): Unit = {
// DateType is not specialized in `SpecificMutableRow`, have to box it here. // DateType is not specialized in `SpecificMutableRow`, have to box it here.
updater.set(value.asInstanceOf[DateType#InternalType]) updater.set(value.asInstanceOf[DateType#InternalType])
} }
} }
// A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor
// annotated by `LIST` or `MAP` should be interpreted as a required list of required
// elements where the element type is the type of the field.
case t: ArrayType if parquetType.getOriginalType != LIST =>
if (parquetType.isPrimitive) {
new RepeatedPrimitiveConverter(parquetType, t.elementType, updater)
} else {
new RepeatedGroupConverter(parquetType, t.elementType, updater)
}
case t: ArrayType => case t: ArrayType =>
new CatalystArrayConverter(parquetType.asGroupType(), t, updater) new CatalystArrayConverter(parquetType.asGroupType(), t, updater)
@ -195,27 +243,11 @@ private[parquet] class CatalystRowConverter(
} }
} }
/**
* Parquet converter for Parquet primitive types. Note that not all Spark SQL atomic types
* are handled by this converter. Parquet primitive types are only a subset of those of Spark
* SQL. For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet.
*/
private final class CatalystPrimitiveConverter(updater: ParentContainerUpdater)
extends PrimitiveConverter {
override def addBoolean(value: Boolean): Unit = updater.setBoolean(value)
override def addInt(value: Int): Unit = updater.setInt(value)
override def addLong(value: Long): Unit = updater.setLong(value)
override def addFloat(value: Float): Unit = updater.setFloat(value)
override def addDouble(value: Double): Unit = updater.setDouble(value)
override def addBinary(value: Binary): Unit = updater.set(value.getBytes)
}
/** /**
* Parquet converter for strings. A dictionary is used to minimize string decoding cost. * Parquet converter for strings. A dictionary is used to minimize string decoding cost.
*/ */
private final class CatalystStringConverter(updater: ParentContainerUpdater) private final class CatalystStringConverter(updater: ParentContainerUpdater)
extends PrimitiveConverter { extends CatalystPrimitiveConverter(updater) {
private var expandedDictionary: Array[UTF8String] = null private var expandedDictionary: Array[UTF8String] = null
@ -242,7 +274,7 @@ private[parquet] class CatalystRowConverter(
private final class CatalystDecimalConverter( private final class CatalystDecimalConverter(
decimalType: DecimalType, decimalType: DecimalType,
updater: ParentContainerUpdater) updater: ParentContainerUpdater)
extends PrimitiveConverter { extends CatalystPrimitiveConverter(updater) {
// Converts decimals stored as INT32 // Converts decimals stored as INT32
override def addInt(value: Int): Unit = { override def addInt(value: Int): Unit = {
@ -306,7 +338,7 @@ private[parquet] class CatalystRowConverter(
parquetSchema: GroupType, parquetSchema: GroupType,
catalystSchema: ArrayType, catalystSchema: ArrayType,
updater: ParentContainerUpdater) updater: ParentContainerUpdater)
extends GroupConverter { extends CatalystGroupConverter(updater) {
private var currentArray: ArrayBuffer[Any] = _ private var currentArray: ArrayBuffer[Any] = _
@ -383,7 +415,7 @@ private[parquet] class CatalystRowConverter(
parquetType: GroupType, parquetType: GroupType,
catalystType: MapType, catalystType: MapType,
updater: ParentContainerUpdater) updater: ParentContainerUpdater)
extends GroupConverter { extends CatalystGroupConverter(updater) {
private var currentKeys: ArrayBuffer[Any] = _ private var currentKeys: ArrayBuffer[Any] = _
private var currentValues: ArrayBuffer[Any] = _ private var currentValues: ArrayBuffer[Any] = _
@ -446,4 +478,61 @@ private[parquet] class CatalystRowConverter(
} }
} }
} }
private trait RepeatedConverter {
private var currentArray: ArrayBuffer[Any] = _
protected def newArrayUpdater(updater: ParentContainerUpdater) = new ParentContainerUpdater {
override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray))
override def set(value: Any): Unit = currentArray += value
}
}
/**
* A primitive converter for converting unannotated repeated primitive values to required arrays
* of required primitives values.
*/
private final class RepeatedPrimitiveConverter(
parquetType: Type,
catalystType: DataType,
parentUpdater: ParentContainerUpdater)
extends PrimitiveConverter with RepeatedConverter with HasParentContainerUpdater {
val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)
private val elementConverter: PrimitiveConverter =
newConverter(parquetType, catalystType, updater).asPrimitiveConverter()
override def addBoolean(value: Boolean): Unit = elementConverter.addBoolean(value)
override def addInt(value: Int): Unit = elementConverter.addInt(value)
override def addLong(value: Long): Unit = elementConverter.addLong(value)
override def addFloat(value: Float): Unit = elementConverter.addFloat(value)
override def addDouble(value: Double): Unit = elementConverter.addDouble(value)
override def addBinary(value: Binary): Unit = elementConverter.addBinary(value)
override def setDictionary(dict: Dictionary): Unit = elementConverter.setDictionary(dict)
override def hasDictionarySupport: Boolean = elementConverter.hasDictionarySupport
override def addValueFromDictionary(id: Int): Unit = elementConverter.addValueFromDictionary(id)
}
/**
* A group converter for converting unannotated repeated group values to required arrays of
* required struct values.
*/
private final class RepeatedGroupConverter(
parquetType: Type,
catalystType: DataType,
parentUpdater: ParentContainerUpdater)
extends GroupConverter with HasParentContainerUpdater with RepeatedConverter {
val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)
private val elementConverter: GroupConverter =
newConverter(parquetType, catalystType, updater).asGroupConverter()
override def getConverter(field: Int): Converter = elementConverter.getConverter(field)
override def end(): Unit = elementConverter.end()
override def start(): Unit = elementConverter.start()
}
} }

View file

@ -100,8 +100,11 @@ private[parquet] class CatalystSchemaConverter(
StructField(field.getName, convertField(field), nullable = false) StructField(field.getName, convertField(field), nullable = false)
case REPEATED => case REPEATED =>
throw new AnalysisException( // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor
s"REPEATED not supported outside LIST or MAP. Type: $field") // annotated by `LIST` or `MAP` should be interpreted as a required list of required
// elements where the element type is the type of the field.
val arrayType = ArrayType(convertField(field), containsNull = false)
StructField(field.getName, arrayType, nullable = false)
} }
} }

Binary file not shown.

Binary file not shown.

View file

View file

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
class ParquetProtobufCompatibilitySuite extends ParquetCompatibilityTest {
override def sqlContext: SQLContext = TestSQLContext
private def readParquetProtobufFile(name: String): DataFrame = {
val url = Thread.currentThread().getContextClassLoader.getResource(name)
sqlContext.read.parquet(url.toString)
}
test("unannotated array of primitive type") {
checkAnswer(readParquetProtobufFile("old-repeated-int.parquet"), Row(Seq(1, 2, 3)))
}
test("unannotated array of struct") {
checkAnswer(
readParquetProtobufFile("old-repeated-message.parquet"),
Row(
Seq(
Row("First inner", null, null),
Row(null, "Second inner", null),
Row(null, null, "Third inner"))))
checkAnswer(
readParquetProtobufFile("proto-repeated-struct.parquet"),
Row(
Seq(
Row("0 - 1", "0 - 2", "0 - 3"),
Row("1 - 1", "1 - 2", "1 - 3"))))
checkAnswer(
readParquetProtobufFile("proto-struct-with-array-many.parquet"),
Seq(
Row(
Seq(
Row("0 - 0 - 1", "0 - 0 - 2", "0 - 0 - 3"),
Row("0 - 1 - 1", "0 - 1 - 2", "0 - 1 - 3"))),
Row(
Seq(
Row("1 - 0 - 1", "1 - 0 - 2", "1 - 0 - 3"),
Row("1 - 1 - 1", "1 - 1 - 2", "1 - 1 - 3"))),
Row(
Seq(
Row("2 - 0 - 1", "2 - 0 - 2", "2 - 0 - 3"),
Row("2 - 1 - 1", "2 - 1 - 2", "2 - 1 - 3")))))
}
test("struct with unannotated array") {
checkAnswer(
readParquetProtobufFile("proto-struct-with-array.parquet"),
Row(10, 9, Seq.empty, null, Row(9), Seq(Row(9), Row(10))))
}
test("unannotated array of struct with unannotated array") {
checkAnswer(
readParquetProtobufFile("nested-array-struct.parquet"),
Seq(
Row(2, Seq(Row(1, Seq(Row(3))))),
Row(5, Seq(Row(4, Seq(Row(6))))),
Row(8, Seq(Row(7, Seq(Row(9)))))))
}
test("unannotated array of string") {
checkAnswer(
readParquetProtobufFile("proto-repeated-string.parquet"),
Seq(
Row(Seq("hello", "world")),
Row(Seq("good", "bye")),
Row(Seq("one", "two", "three"))))
}
}

View file

@ -585,6 +585,36 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|} |}
""".stripMargin) """.stripMargin)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type 7 - " +
"parquet-protobuf primitive lists",
new StructType()
.add("f1", ArrayType(IntegerType, containsNull = false), nullable = false),
"""message root {
| repeated int32 f1;
|}
""".stripMargin)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type 8 - " +
"parquet-protobuf non-primitive lists",
{
val elementType =
new StructType()
.add("c1", StringType, nullable = true)
.add("c2", IntegerType, nullable = false)
new StructType()
.add("f1", ArrayType(elementType, containsNull = false), nullable = false)
},
"""message root {
| repeated group f1 {
| optional binary c1 (UTF8);
| required int32 c2;
| }
|}
""".stripMargin)
// ======================================================= // =======================================================
// Tests for converting Catalyst ArrayType to Parquet LIST // Tests for converting Catalyst ArrayType to Parquet LIST
// ======================================================= // =======================================================