[SPARK-4520] [SQL] This pr fixes the ArrayIndexOutOfBoundsException as r...

...aised in SPARK-4520.

The exception is thrown only for a thrift generated parquet file. The array element schema name is assumed as "array" as per ParquetAvro but for thrift generated parquet files, it is array_name + "_tuple". This leads to missing child of array group type and hence when the parquet rows are being materialized leads to the exception.

Author: Sadhan Sood <sadhan@tellapart.com>

Closes #4148 from sadhan/SPARK-4520 and squashes the following commits:

c5ccde8 [Sadhan Sood] [SPARK-4520] [SQL] This pr fixes the ArrayIndexOutOfBoundsException as raised in SPARK-4520.
This commit is contained in:
Sadhan Sood 2015-02-04 19:18:06 -08:00 committed by Cheng Lian
parent 1fbd124b1b
commit dba98bf698
4 changed files with 60 additions and 14 deletions

View file

@ -66,6 +66,11 @@ private[sql] object CatalystConverter {
// Using a different value will result in Parquet silently dropping columns.
val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag"
val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
// SPARK-4520: Thrift generated parquet files have different array element
// schema names than avro. Thrift parquet uses array_schema_name + "_tuple"
// as opposed to "array" used by default. For more information, check
// TestThriftSchemaConverter.java in parquet.thrift.
val THRIFT_ARRAY_ELEMENTS_SCHEMA_NAME_SUFFIX = "_tuple"
val MAP_KEY_SCHEMA_NAME = "key"
val MAP_VALUE_SCHEMA_NAME = "value"
val MAP_SCHEMA_NAME = "map"

View file

@ -99,7 +99,11 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
val requestedAttributes = RowReadSupport.getRequestedSchema(configuration)
if (requestedAttributes != null) {
parquetSchema = ParquetTypesConverter.convertFromAttributes(requestedAttributes)
// If the parquet file is thrift derived, there is a good chance that
// it will have the thrift class in metadata.
val isThriftDerived = keyValueMetaData.keySet().contains("thrift.class")
parquetSchema = ParquetTypesConverter
.convertFromAttributes(requestedAttributes, isThriftDerived)
metadata.put(
RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
ParquetTypesConverter.convertToString(requestedAttributes))

View file

@ -285,13 +285,19 @@ private[parquet] object ParquetTypesConverter extends Logging {
ctype: DataType,
name: String,
nullable: Boolean = true,
inArray: Boolean = false): ParquetType = {
inArray: Boolean = false,
toThriftSchemaNames: Boolean = false): ParquetType = {
val repetition =
if (inArray) {
Repetition.REPEATED
} else {
if (nullable) Repetition.OPTIONAL else Repetition.REQUIRED
}
val arraySchemaName = if (toThriftSchemaNames) {
name + CatalystConverter.THRIFT_ARRAY_ELEMENTS_SCHEMA_NAME_SUFFIX
} else {
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME
}
val typeInfo = fromPrimitiveDataType(ctype)
typeInfo.map {
case ParquetTypeInfo(primitiveType, originalType, decimalMetadata, length) =>
@ -306,22 +312,24 @@ private[parquet] object ParquetTypesConverter extends Logging {
}.getOrElse {
ctype match {
case udt: UserDefinedType[_] => {
fromDataType(udt.sqlType, name, nullable, inArray)
fromDataType(udt.sqlType, name, nullable, inArray, toThriftSchemaNames)
}
case ArrayType(elementType, false) => {
val parquetElementType = fromDataType(
elementType,
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
arraySchemaName,
nullable = false,
inArray = true)
inArray = true,
toThriftSchemaNames)
ConversionPatterns.listType(repetition, name, parquetElementType)
}
case ArrayType(elementType, true) => {
val parquetElementType = fromDataType(
elementType,
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
arraySchemaName,
nullable = true,
inArray = false)
inArray = false,
toThriftSchemaNames)
ConversionPatterns.listType(
repetition,
name,
@ -332,7 +340,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
}
case StructType(structFields) => {
val fields = structFields.map {
field => fromDataType(field.dataType, field.name, field.nullable, inArray = false)
field => fromDataType(field.dataType, field.name, field.nullable,
inArray = false, toThriftSchemaNames)
}
new ParquetGroupType(repetition, name, fields.toSeq)
}
@ -342,13 +351,15 @@ private[parquet] object ParquetTypesConverter extends Logging {
keyType,
CatalystConverter.MAP_KEY_SCHEMA_NAME,
nullable = false,
inArray = false)
inArray = false,
toThriftSchemaNames)
val parquetValueType =
fromDataType(
valueType,
CatalystConverter.MAP_VALUE_SCHEMA_NAME,
nullable = valueContainsNull,
inArray = false)
inArray = false,
toThriftSchemaNames)
ConversionPatterns.mapType(
repetition,
name,
@ -374,10 +385,12 @@ private[parquet] object ParquetTypesConverter extends Logging {
field.getRepetition != Repetition.REQUIRED)())
}
def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
def convertFromAttributes(attributes: Seq[Attribute],
toThriftSchemaNames: Boolean = false): MessageType = {
val fields = attributes.map(
attribute =>
fromDataType(attribute.dataType, attribute.name, attribute.nullable))
fromDataType(attribute.dataType, attribute.name, attribute.nullable,
toThriftSchemaNames = toThriftSchemaNames))
new MessageType("root", fields)
}

View file

@ -33,9 +33,10 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
* Checks whether the reflected Parquet message type for product type `T` conforms `messageType`.
*/
private def testSchema[T <: Product: ClassTag: TypeTag](
testName: String, messageType: String): Unit = {
testName: String, messageType: String, isThriftDerived: Boolean = false): Unit = {
test(testName) {
val actual = ParquetTypesConverter.convertFromAttributes(ScalaReflection.attributesFor[T])
val actual = ParquetTypesConverter.convertFromAttributes(ScalaReflection.attributesFor[T],
isThriftDerived)
val expected = MessageTypeParser.parseMessageType(messageType)
actual.checkContains(expected)
expected.checkContains(actual)
@ -146,6 +147,29 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
|}
""".stripMargin)
// Test for SPARK-4520 -- ensure that thrift generated parquet schema is generated
// as expected from attributes
testSchema[(Array[Byte], Array[Byte], Array[Byte], Seq[Int], Map[Array[Byte], Seq[Int]])](
"thrift generated parquet schema",
"""
|message root {
| optional binary _1 (UTF8);
| optional binary _2 (UTF8);
| optional binary _3 (UTF8);
| optional group _4 (LIST) {
| repeated int32 _4_tuple;
| }
| optional group _5 (MAP) {
| repeated group map (MAP_KEY_VALUE) {
| required binary key (UTF8);
| optional group value (LIST) {
| repeated int32 value_tuple;
| }
| }
| }
|}
""".stripMargin, isThriftDerived = true)
test("DataType string parser compatibility") {
// This is the generated string from previous versions of the Spark SQL, using the following:
// val schema = StructType(List(