From dba98bf6987ec39380f1a5b0ca2772b694452231 Mon Sep 17 00:00:00 2001 From: Sadhan Sood Date: Wed, 4 Feb 2015 19:18:06 -0800 Subject: [PATCH] [SPARK-4520] [SQL] This pr fixes the ArrayIndexOutOfBoundsException as r... ...aised in SPARK-4520. The exception is thrown only for a thrift generated parquet file. The array element schema name is assumed as "array" as per ParquetAvro but for thrift generated parquet files, it is array_name + "_tuple". This leads to missing child of array group type and hence when the parquet rows are being materialized leads to the exception. Author: Sadhan Sood Closes #4148 from sadhan/SPARK-4520 and squashes the following commits: c5ccde8 [Sadhan Sood] [SPARK-4520] [SQL] This pr fixes the ArrayIndexOutOfBoundsException as raised in SPARK-4520. --- .../spark/sql/parquet/ParquetConverter.scala | 5 +++ .../sql/parquet/ParquetTableSupport.scala | 6 +++- .../spark/sql/parquet/ParquetTypes.scala | 35 +++++++++++++------ .../sql/parquet/ParquetSchemaSuite.scala | 28 +++++++++++++-- 4 files changed, 60 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index d87ddfeabd..7d62f3728f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -66,6 +66,11 @@ private[sql] object CatalystConverter { // Using a different value will result in Parquet silently dropping columns. val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag" val ARRAY_ELEMENTS_SCHEMA_NAME = "array" + // SPARK-4520: Thrift generated parquet files have different array element + // schema names than avro. Thrift parquet uses array_schema_name + "_tuple" + // as opposed to "array" used by default. For more information, check + // TestThriftSchemaConverter.java in parquet.thrift. + val THRIFT_ARRAY_ELEMENTS_SCHEMA_NAME_SUFFIX = "_tuple" val MAP_KEY_SCHEMA_NAME = "key" val MAP_VALUE_SCHEMA_NAME = "value" val MAP_SCHEMA_NAME = "map" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 3fb1cc4105..14c81ae4eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -99,7 +99,11 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging { val requestedAttributes = RowReadSupport.getRequestedSchema(configuration) if (requestedAttributes != null) { - parquetSchema = ParquetTypesConverter.convertFromAttributes(requestedAttributes) + // If the parquet file is thrift derived, there is a good chance that + // it will have the thrift class in metadata. + val isThriftDerived = keyValueMetaData.keySet().contains("thrift.class") + parquetSchema = ParquetTypesConverter + .convertFromAttributes(requestedAttributes, isThriftDerived) metadata.put( RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, ParquetTypesConverter.convertToString(requestedAttributes)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index f1d4ff2387..b646109b7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -285,13 +285,19 @@ private[parquet] object ParquetTypesConverter extends Logging { ctype: DataType, name: String, nullable: Boolean = true, - inArray: Boolean = false): ParquetType = { + inArray: Boolean = false, + toThriftSchemaNames: Boolean = false): ParquetType = { val repetition = if (inArray) { Repetition.REPEATED } else { if (nullable) Repetition.OPTIONAL else Repetition.REQUIRED } + val arraySchemaName = if (toThriftSchemaNames) { + name + CatalystConverter.THRIFT_ARRAY_ELEMENTS_SCHEMA_NAME_SUFFIX + } else { + CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME + } val typeInfo = fromPrimitiveDataType(ctype) typeInfo.map { case ParquetTypeInfo(primitiveType, originalType, decimalMetadata, length) => @@ -306,22 +312,24 @@ private[parquet] object ParquetTypesConverter extends Logging { }.getOrElse { ctype match { case udt: UserDefinedType[_] => { - fromDataType(udt.sqlType, name, nullable, inArray) + fromDataType(udt.sqlType, name, nullable, inArray, toThriftSchemaNames) } case ArrayType(elementType, false) => { val parquetElementType = fromDataType( elementType, - CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, + arraySchemaName, nullable = false, - inArray = true) + inArray = true, + toThriftSchemaNames) ConversionPatterns.listType(repetition, name, parquetElementType) } case ArrayType(elementType, true) => { val parquetElementType = fromDataType( elementType, - CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, + arraySchemaName, nullable = true, - inArray = false) + inArray = false, + toThriftSchemaNames) ConversionPatterns.listType( repetition, name, @@ -332,7 +340,8 @@ private[parquet] object ParquetTypesConverter extends Logging { } case StructType(structFields) => { val fields = structFields.map { - field => fromDataType(field.dataType, field.name, field.nullable, inArray = false) + field => fromDataType(field.dataType, field.name, field.nullable, + inArray = false, toThriftSchemaNames) } new ParquetGroupType(repetition, name, fields.toSeq) } @@ -342,13 +351,15 @@ private[parquet] object ParquetTypesConverter extends Logging { keyType, CatalystConverter.MAP_KEY_SCHEMA_NAME, nullable = false, - inArray = false) + inArray = false, + toThriftSchemaNames) val parquetValueType = fromDataType( valueType, CatalystConverter.MAP_VALUE_SCHEMA_NAME, nullable = valueContainsNull, - inArray = false) + inArray = false, + toThriftSchemaNames) ConversionPatterns.mapType( repetition, name, @@ -374,10 +385,12 @@ private[parquet] object ParquetTypesConverter extends Logging { field.getRepetition != Repetition.REQUIRED)()) } - def convertFromAttributes(attributes: Seq[Attribute]): MessageType = { + def convertFromAttributes(attributes: Seq[Attribute], + toThriftSchemaNames: Boolean = false): MessageType = { val fields = attributes.map( attribute => - fromDataType(attribute.dataType, attribute.name, attribute.nullable)) + fromDataType(attribute.dataType, attribute.name, attribute.nullable, + toThriftSchemaNames = toThriftSchemaNames)) new MessageType("root", fields) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala index 64274950b8..5f7f31d395 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala @@ -33,9 +33,10 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { * Checks whether the reflected Parquet message type for product type `T` conforms `messageType`. */ private def testSchema[T <: Product: ClassTag: TypeTag]( - testName: String, messageType: String): Unit = { + testName: String, messageType: String, isThriftDerived: Boolean = false): Unit = { test(testName) { - val actual = ParquetTypesConverter.convertFromAttributes(ScalaReflection.attributesFor[T]) + val actual = ParquetTypesConverter.convertFromAttributes(ScalaReflection.attributesFor[T], + isThriftDerived) val expected = MessageTypeParser.parseMessageType(messageType) actual.checkContains(expected) expected.checkContains(actual) @@ -146,6 +147,29 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { |} """.stripMargin) + // Test for SPARK-4520 -- ensure that thrift generated parquet schema is generated + // as expected from attributes + testSchema[(Array[Byte], Array[Byte], Array[Byte], Seq[Int], Map[Array[Byte], Seq[Int]])]( + "thrift generated parquet schema", + """ + |message root { + | optional binary _1 (UTF8); + | optional binary _2 (UTF8); + | optional binary _3 (UTF8); + | optional group _4 (LIST) { + | repeated int32 _4_tuple; + | } + | optional group _5 (MAP) { + | repeated group map (MAP_KEY_VALUE) { + | required binary key (UTF8); + | optional group value (LIST) { + | repeated int32 value_tuple; + | } + | } + | } + |} + """.stripMargin, isThriftDerived = true) + test("DataType string parser compatibility") { // This is the generated string from previous versions of the Spark SQL, using the following: // val schema = StructType(List(