[SPARK-6123] [SPARK-6775] [SPARK-6776] [SQL] Refactors Parquet read path for interoperability and backwards-compatibility

This PR is a follow-up of #6617 and is part of [SPARK-6774] [2], which aims to ensure interoperability and backwards-compatibility for Spark SQL Parquet support.  And this one fixes the read path.  Now Spark SQL is expected to be able to read legacy Parquet data files generated by most (if not all) common libraries/tools like parquet-thrift, parquet-avro, and parquet-hive. However, we still need to refactor the write path to write standard Parquet LISTs and MAPs ([SPARK-8848] [4]).

### Major changes

1. `CatalystConverter` class hierarchy refactoring

   - Replaces `CatalystConverter` trait with a much simpler `ParentContainerUpdater`.

     Now instead of extending the original `CatalystConverter` trait, every converter class accepts an updater which is responsible for propagating the converted value to some parent container. For example, appending array elements to a parent array buffer, appending a key-value pairs to a parent mutable map, or setting a converted value to some specific field of a parent row. Root converter doesn't have a parent and thus uses a `NoopUpdater`.

     This simplifies the design since converters don't need to care about details of their parent converters anymore.

   - Unifies `CatalystRootConverter`, `CatalystGroupConverter` and `CatalystPrimitiveRowConverter` into `CatalystRowConverter`

     Specifically, now all row objects are represented by `SpecificMutableRow` during conversion.

   - Refactors `CatalystArrayConverter`, and removes `CatalystArrayContainsNullConverter` and `CatalystNativeArrayConverter`

     `CatalystNativeArrayConverter` was probably designed with the intention of avoiding boxing costs. However, the way it uses Scala generics actually doesn't achieve this goal.

     The new `CatalystArrayConverter` handles both nullable and non-nullable array elements in a consistent way.

   - Implements backwards-compatibility rules in `CatalystArrayConverter`

     When Parquet records are being converted, schema of Parquet files should have already been verified. So we only need to care about the structure rather than field names in the Parquet schema. Since all map objects represented in legacy systems have the same structure as the standard one (see [backwards-compatibility rules for MAP] [1]), we only need to deal with LIST (namely array) in `CatalystArrayConverter`.

2. Requested columns handling

   When specifying requested columns in `RowReadSupport`, we used to use a Parquet `MessageType` converted from a Catalyst `StructType` which contains all requested columns.  This is not preferable when taking compatibility and interoperability into consideration.  Because the actual Parquet file may have different physical structure from the converted schema.

   In this PR, the schema for requested columns is constructed using the following method:

   - For a column that exists in the target Parquet file, we extract the column type by name from the full file schema, and construct a single-field `MessageType` for that column.
   - For a column that doesn't exist in the target Parquet file, we create a single-field `StructType` and convert it to a `MessageType` using `CatalystSchemaConverter`.
   - Unions all single-field `MessageType`s into a full schema containing all requested fields

   With this change, we also fix [SPARK-6123] [3] by validating the global schema against each individual Parquet part-files.

### Testing

This PR also adds compatibility tests for parquet-avro, parquet-thrift, and parquet-hive. Please refer to `README.md` under `sql/core/src/test` for more information about these tests. To avoid build time code generation and adding extra complexity to the build system, Java code generated from testing Thrift schema and Avro IDL is also checked in.

[1]: https://github.com/apache/incubator-parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
[2]: https://issues.apache.org/jira/browse/SPARK-6774
[3]: https://issues.apache.org/jira/browse/SPARK-6123
[4]: https://issues.apache.org/jira/browse/SPARK-8848

Author: Cheng Lian <lian@databricks.com>

Closes #7231 from liancheng/spark-6776 and squashes the following commits:

360fe18 [Cheng Lian] Adds ParquetHiveCompatibilitySuite
c6fbc06 [Cheng Lian] Removes WIP file committed by mistake
b8c1295 [Cheng Lian] Excludes the whole parquet package from MiMa
598c3e8 [Cheng Lian] Adds extra Maven repo for hadoop-lzo, which is a transitive dependency of parquet-thrift
926af87 [Cheng Lian] Simplifies Parquet compatibility test suites
7946ee1 [Cheng Lian] Fixes Scala styling issues
3d7ab36 [Cheng Lian] Fixes .rat-excludes
a8f13bb [Cheng Lian] Using Parquet writer API to do compatibility tests
f2208cd [Cheng Lian] Adds README.md for Thrift/Avro code generation
1d390aa [Cheng Lian] Adds parquet-thrift compatibility test
440f7b3 [Cheng Lian] Adds generated files to .rat-excludes
13b9121 [Cheng Lian] Adds ParquetAvroCompatibilitySuite
06cfe9d [Cheng Lian] Adds comments about TimestampType handling
a099d3e [Cheng Lian] More comments
0cc1b37 [Cheng Lian] Fixes MiMa checks
884d3e6 [Cheng Lian] Fixes styling issue and reverts unnecessary changes
802cbd7 [Cheng Lian] Fixes bugs related to schema merging and empty requested columns
38fe1e7 [Cheng Lian] Adds explicit return type
7fb21f1 [Cheng Lian] Reverts an unnecessary debugging change
1781dff [Cheng Lian] Adds test case for SPARK-8811
6437d4b [Cheng Lian] Assembles requested schema from Parquet file schema
bcac49f [Cheng Lian] Removes the 16-byte restriction of decimals
a74fb2c [Cheng Lian] More comments
0525346 [Cheng Lian] Removes old Parquet record converters
03c3bd9 [Cheng Lian] Refactors Parquet read path to implement backwards-compatibility rules
This commit is contained in:
Cheng Lian 2015-07-08 15:51:01 -07:00
parent 5687f76552
commit 4ffc27caaf
27 changed files with 5982 additions and 917 deletions

View file

@ -91,3 +91,5 @@ help/*
html/* html/*
INDEX INDEX
.lintr .lintr
gen-java.*
.*avpr

33
pom.xml
View file

@ -161,6 +161,7 @@
<fasterxml.jackson.version>2.4.4</fasterxml.jackson.version> <fasterxml.jackson.version>2.4.4</fasterxml.jackson.version>
<snappy.version>1.1.1.7</snappy.version> <snappy.version>1.1.1.7</snappy.version>
<netlib.java.version>1.1.2</netlib.java.version> <netlib.java.version>1.1.2</netlib.java.version>
<thrift.version>0.9.2</thrift.version>
<!-- For maven shade plugin (see SPARK-8819) --> <!-- For maven shade plugin (see SPARK-8819) -->
<create.dependency.reduced.pom>false</create.dependency.reduced.pom> <create.dependency.reduced.pom>false</create.dependency.reduced.pom>
@ -179,6 +180,8 @@
<hbase.deps.scope>compile</hbase.deps.scope> <hbase.deps.scope>compile</hbase.deps.scope>
<hive.deps.scope>compile</hive.deps.scope> <hive.deps.scope>compile</hive.deps.scope>
<parquet.deps.scope>compile</parquet.deps.scope> <parquet.deps.scope>compile</parquet.deps.scope>
<parquet.test.deps.scope>test</parquet.test.deps.scope>
<thrift.test.deps.scope>test</thrift.test.deps.scope>
<!-- <!--
Overridable test home. So that you can call individual pom files directly without Overridable test home. So that you can call individual pom files directly without
@ -270,6 +273,18 @@
<enabled>false</enabled> <enabled>false</enabled>
</snapshots> </snapshots>
</repository> </repository>
<!-- For transitive dependencies brougt by parquet-thrift -->
<repository>
<id>twttr-repo</id>
<name>Twttr Repository</name>
<url>http://maven.twttr.com</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<!-- TODO: This can be deleted after Spark 1.4 is posted --> <!-- TODO: This can be deleted after Spark 1.4 is posted -->
<repository> <repository>
<id>spark-1.4-staging</id> <id>spark-1.4-staging</id>
@ -1101,6 +1116,24 @@
<version>${parquet.version}</version> <version>${parquet.version}</version>
<scope>${parquet.deps.scope}</scope> <scope>${parquet.deps.scope}</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>${parquet.version}</version>
<scope>${parquet.test.deps.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-thrift</artifactId>
<version>${parquet.version}</version>
<scope>${parquet.test.deps.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>${thrift.version}</version>
<scope>${thrift.test.deps.scope}</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.flume</groupId> <groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId> <artifactId>flume-ng-core</artifactId>

View file

@ -62,21 +62,8 @@ object MimaExcludes {
"org.apache.spark.ml.classification.LogisticCostFun.this"), "org.apache.spark.ml.classification.LogisticCostFun.this"),
// SQL execution is considered private. // SQL execution is considered private.
excludePackage("org.apache.spark.sql.execution"), excludePackage("org.apache.spark.sql.execution"),
// NanoTime and CatalystTimestampConverter is only used inside catalyst, // Parquet support is considered private.
// not needed anymore excludePackage("org.apache.spark.sql.parquet")
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.timestamp.NanoTime"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.timestamp.NanoTime$"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.CatalystTimestampConverter"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.CatalystTimestampConverter$"),
// SPARK-6777 Implements backwards compatibility rules in CatalystSchemaConverter
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetTypeInfo"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetTypeInfo$")
) ++ Seq( ) ++ Seq(
// SPARK-8479 Add numNonzeros and numActives to Matrix. // SPARK-8479 Add numNonzeros and numActives to Matrix.
ProblemFilters.exclude[MissingMethodProblem]( ProblemFilters.exclude[MissingMethodProblem](

View file

@ -17,11 +17,12 @@
package org.apache.spark.sql.types package org.apache.spark.sql.types
import scala.util.Try
import scala.util.parsing.combinator.RegexParsers import scala.util.parsing.combinator.RegexParsers
import org.json4s._
import org.json4s.JsonAST.JValue import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._ import org.json4s.JsonDSL._
import org.json4s._
import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.JsonMethods._
import org.apache.spark.annotation.DeveloperApi import org.apache.spark.annotation.DeveloperApi
@ -82,6 +83,9 @@ abstract class DataType extends AbstractDataType {
object DataType { object DataType {
private[sql] def fromString(raw: String): DataType = {
Try(DataType.fromJson(raw)).getOrElse(DataType.fromCaseClassString(raw))
}
def fromJson(json: String): DataType = parseDataType(parse(json)) def fromJson(json: String): DataType = parseDataType(parse(json))

View file

@ -311,6 +311,11 @@ object StructType extends AbstractDataType {
private[sql] override def simpleString: String = "struct" private[sql] override def simpleString: String = "struct"
private[sql] def fromString(raw: String): StructType = DataType.fromString(raw) match {
case t: StructType => t
case _ => throw new RuntimeException(s"Failed parsing StructType: $raw")
}
def apply(fields: Seq[StructField]): StructType = StructType(fields.toArray) def apply(fields: Seq[StructField]): StructType = StructType(fields.toArray)
def apply(fields: java.util.List[StructField]): StructType = { def apply(fields: java.util.List[StructField]): StructType = {

View file

@ -101,9 +101,45 @@
<version>9.3-1102-jdbc41</version> <version>9.3-1102-jdbc41</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-thrift</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-scala-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
<source>src/test/gen-java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build> </build>
</project> </project>

View file

@ -0,0 +1,434 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.parquet
import java.nio.ByteOrder
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{GroupType, PrimitiveType, Type}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
/**
* A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some
* corresponding parent container. For example, a converter for a `StructType` field may set
* converted values to a [[MutableRow]]; or a converter for array elements may append converted
* values to an [[ArrayBuffer]].
*/
private[parquet] trait ParentContainerUpdater {
def set(value: Any): Unit = ()
def setBoolean(value: Boolean): Unit = set(value)
def setByte(value: Byte): Unit = set(value)
def setShort(value: Short): Unit = set(value)
def setInt(value: Int): Unit = set(value)
def setLong(value: Long): Unit = set(value)
def setFloat(value: Float): Unit = set(value)
def setDouble(value: Double): Unit = set(value)
}
/** A no-op updater used for root converter (who doesn't have a parent). */
private[parquet] object NoopUpdater extends ParentContainerUpdater
/**
* A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark SQL [[Row]]s. Since
* any Parquet record is also a struct, this converter can also be used as root converter.
*
* When used as a root converter, [[NoopUpdater]] should be used since root converters don't have
* any "parent" container.
*
* @param parquetType Parquet schema of Parquet records
* @param catalystType Spark SQL schema that corresponds to the Parquet record type
* @param updater An updater which propagates converted field values to the parent container
*/
private[parquet] class CatalystRowConverter(
parquetType: GroupType,
catalystType: StructType,
updater: ParentContainerUpdater)
extends GroupConverter {
/**
* Updater used together with field converters within a [[CatalystRowConverter]]. It propagates
* converted filed values to the `ordinal`-th cell in `currentRow`.
*/
private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater {
override def set(value: Any): Unit = row(ordinal) = value
override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value)
override def setByte(value: Byte): Unit = row.setByte(ordinal, value)
override def setShort(value: Short): Unit = row.setShort(ordinal, value)
override def setInt(value: Int): Unit = row.setInt(ordinal, value)
override def setLong(value: Long): Unit = row.setLong(ordinal, value)
override def setDouble(value: Double): Unit = row.setDouble(ordinal, value)
override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
}
/**
* Represents the converted row object once an entire Parquet record is converted.
*
* @todo Uses [[UnsafeRow]] for better performance.
*/
val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
// Converters for each field.
private val fieldConverters: Array[Converter] = {
parquetType.getFields.zip(catalystType).zipWithIndex.map {
case ((parquetFieldType, catalystField), ordinal) =>
// Converted field value should be set to the `ordinal`-th cell of `currentRow`
newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal))
}.toArray
}
override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex)
override def end(): Unit = updater.set(currentRow)
override def start(): Unit = {
var i = 0
while (i < currentRow.length) {
currentRow.setNullAt(i)
i += 1
}
}
/**
* Creates a converter for the given Parquet type `parquetType` and Spark SQL data type
* `catalystType`. Converted values are handled by `updater`.
*/
private def newConverter(
parquetType: Type,
catalystType: DataType,
updater: ParentContainerUpdater): Converter = {
catalystType match {
case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType =>
new CatalystPrimitiveConverter(updater)
case ByteType =>
new PrimitiveConverter {
override def addInt(value: Int): Unit =
updater.setByte(value.asInstanceOf[ByteType#InternalType])
}
case ShortType =>
new PrimitiveConverter {
override def addInt(value: Int): Unit =
updater.setShort(value.asInstanceOf[ShortType#InternalType])
}
case t: DecimalType =>
new CatalystDecimalConverter(t, updater)
case StringType =>
new CatalystStringConverter(updater)
case TimestampType =>
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
new PrimitiveConverter {
// Converts nanosecond timestamps stored as INT96
override def addBinary(value: Binary): Unit = {
assert(
value.length() == 12,
"Timestamps (with nanoseconds) are expected to be stored in 12-byte long binaries, " +
s"but got a ${value.length()}-byte binary.")
val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
val timeOfDayNanos = buf.getLong
val julianDay = buf.getInt
updater.setLong(DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos))
}
}
case DateType =>
new PrimitiveConverter {
override def addInt(value: Int): Unit = {
// DateType is not specialized in `SpecificMutableRow`, have to box it here.
updater.set(value.asInstanceOf[DateType#InternalType])
}
}
case t: ArrayType =>
new CatalystArrayConverter(parquetType.asGroupType(), t, updater)
case t: MapType =>
new CatalystMapConverter(parquetType.asGroupType(), t, updater)
case t: StructType =>
new CatalystRowConverter(parquetType.asGroupType(), t, new ParentContainerUpdater {
override def set(value: Any): Unit = updater.set(value.asInstanceOf[Row].copy())
})
case t: UserDefinedType[_] =>
val catalystTypeForUDT = t.sqlType
val nullable = parquetType.isRepetition(Repetition.OPTIONAL)
val field = StructField("udt", catalystTypeForUDT, nullable)
val parquetTypeForUDT = new CatalystSchemaConverter().convertField(field)
newConverter(parquetTypeForUDT, catalystTypeForUDT, updater)
case _ =>
throw new RuntimeException(
s"Unable to create Parquet converter for data type ${catalystType.json}")
}
}
/**
* Parquet converter for Parquet primitive types. Note that not all Spark SQL atomic types
* are handled by this converter. Parquet primitive types are only a subset of those of Spark
* SQL. For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet.
*/
private final class CatalystPrimitiveConverter(updater: ParentContainerUpdater)
extends PrimitiveConverter {
override def addBoolean(value: Boolean): Unit = updater.setBoolean(value)
override def addInt(value: Int): Unit = updater.setInt(value)
override def addLong(value: Long): Unit = updater.setLong(value)
override def addFloat(value: Float): Unit = updater.setFloat(value)
override def addDouble(value: Double): Unit = updater.setDouble(value)
override def addBinary(value: Binary): Unit = updater.set(value.getBytes)
}
/**
* Parquet converter for strings. A dictionary is used to minimize string decoding cost.
*/
private final class CatalystStringConverter(updater: ParentContainerUpdater)
extends PrimitiveConverter {
private var expandedDictionary: Array[UTF8String] = null
override def hasDictionarySupport: Boolean = true
override def setDictionary(dictionary: Dictionary): Unit = {
this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { i =>
UTF8String.fromBytes(dictionary.decodeToBinary(i).getBytes)
}
}
override def addValueFromDictionary(dictionaryId: Int): Unit = {
updater.set(expandedDictionary(dictionaryId))
}
override def addBinary(value: Binary): Unit = {
updater.set(UTF8String.fromBytes(value.getBytes))
}
}
/**
* Parquet converter for fixed-precision decimals.
*/
private final class CatalystDecimalConverter(
decimalType: DecimalType,
updater: ParentContainerUpdater)
extends PrimitiveConverter {
// Converts decimals stored as INT32
override def addInt(value: Int): Unit = {
addLong(value: Long)
}
// Converts decimals stored as INT64
override def addLong(value: Long): Unit = {
updater.set(Decimal(value, decimalType.precision, decimalType.scale))
}
// Converts decimals stored as either FIXED_LENGTH_BYTE_ARRAY or BINARY
override def addBinary(value: Binary): Unit = {
updater.set(toDecimal(value))
}
private def toDecimal(value: Binary): Decimal = {
val precision = decimalType.precision
val scale = decimalType.scale
val bytes = value.getBytes
var unscaled = 0L
var i = 0
while (i < bytes.length) {
unscaled = (unscaled << 8) | (bytes(i) & 0xff)
i += 1
}
val bits = 8 * bytes.length
unscaled = (unscaled << (64 - bits)) >> (64 - bits)
Decimal(unscaled, precision, scale)
}
}
/**
* Parquet converter for arrays. Spark SQL arrays are represented as Parquet lists. Standard
* Parquet lists are represented as a 3-level group annotated by `LIST`:
* {{{
* <list-repetition> group <name> (LIST) { <-- parquetSchema points here
* repeated group list {
* <element-repetition> <element-type> element;
* }
* }
* }}}
* The `parquetSchema` constructor argument points to the outermost group.
*
* However, before this representation is standardized, some Parquet libraries/tools also use some
* non-standard formats to represent list-like structures. Backwards-compatibility rules for
* handling these cases are described in Parquet format spec.
*
* @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
*/
private final class CatalystArrayConverter(
parquetSchema: GroupType,
catalystSchema: ArrayType,
updater: ParentContainerUpdater)
extends GroupConverter {
private var currentArray: ArrayBuffer[Any] = _
private val elementConverter: Converter = {
val repeatedType = parquetSchema.getType(0)
val elementType = catalystSchema.elementType
if (isElementType(repeatedType, elementType)) {
newConverter(repeatedType, elementType, new ParentContainerUpdater {
override def set(value: Any): Unit = currentArray += value
})
} else {
new ElementConverter(repeatedType.asGroupType().getType(0), elementType)
}
}
override def getConverter(fieldIndex: Int): Converter = elementConverter
override def end(): Unit = updater.set(currentArray)
// NOTE: We can't reuse the mutable `ArrayBuffer` here and must instantiate a new buffer for the
// next value. `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored
// in row cells.
override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
// scalastyle:off
/**
* Returns whether the given type is the element type of a list or is a syntactic group with
* one field that is the element type. This is determined by checking whether the type can be
* a syntactic group and by checking whether a potential syntactic group matches the expected
* schema.
* {{{
* <list-repetition> group <name> (LIST) {
* repeated group list { <-- repeatedType points here
* <element-repetition> <element-type> element;
* }
* }
* }}}
* In short, here we handle Parquet list backwards-compatibility rules on the read path. This
* method is based on `AvroIndexedRecordConverter.isElementType`.
*
* @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
*/
// scalastyle:on
private def isElementType(parquetRepeatedType: Type, catalystElementType: DataType): Boolean = {
(parquetRepeatedType, catalystElementType) match {
case (t: PrimitiveType, _) => true
case (t: GroupType, _) if t.getFieldCount > 1 => true
case (t: GroupType, StructType(Array(f))) if f.name == t.getFieldName(0) => true
case _ => false
}
}
/** Array element converter */
private final class ElementConverter(parquetType: Type, catalystType: DataType)
extends GroupConverter {
private var currentElement: Any = _
private val converter = newConverter(parquetType, catalystType, new ParentContainerUpdater {
override def set(value: Any): Unit = currentElement = value
})
override def getConverter(fieldIndex: Int): Converter = converter
override def end(): Unit = currentArray += currentElement
override def start(): Unit = currentElement = null
}
}
/** Parquet converter for maps */
private final class CatalystMapConverter(
parquetType: GroupType,
catalystType: MapType,
updater: ParentContainerUpdater)
extends GroupConverter {
private var currentMap: mutable.Map[Any, Any] = _
private val keyValueConverter = {
val repeatedType = parquetType.getType(0).asGroupType()
new KeyValueConverter(
repeatedType.getType(0),
repeatedType.getType(1),
catalystType.keyType,
catalystType.valueType)
}
override def getConverter(fieldIndex: Int): Converter = keyValueConverter
override def end(): Unit = updater.set(currentMap)
// NOTE: We can't reuse the mutable Map here and must instantiate a new `Map` for the next
// value. `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored in row
// cells.
override def start(): Unit = currentMap = mutable.Map.empty[Any, Any]
/** Parquet converter for key-value pairs within the map. */
private final class KeyValueConverter(
parquetKeyType: Type,
parquetValueType: Type,
catalystKeyType: DataType,
catalystValueType: DataType)
extends GroupConverter {
private var currentKey: Any = _
private var currentValue: Any = _
private val converters = Array(
// Converter for keys
newConverter(parquetKeyType, catalystKeyType, new ParentContainerUpdater {
override def set(value: Any): Unit = currentKey = value
}),
// Converter for values
newConverter(parquetValueType, catalystValueType, new ParentContainerUpdater {
override def set(value: Any): Unit = currentValue = value
}))
override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex)
override def end(): Unit = currentMap(currentKey) = currentValue
override def start(): Unit = {
currentKey = null
currentValue = null
}
}
}
}

View file

@ -358,9 +358,24 @@ private[parquet] class CatalystSchemaConverter(
case DateType => case DateType =>
Types.primitive(INT32, repetition).as(DATE).named(field.name) Types.primitive(INT32, repetition).as(DATE).named(field.name)
// NOTE: !! This timestamp type is not specified in Parquet format spec !! // NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet format spec.
// However, Impala and older versions of Spark SQL use INT96 to store timestamps with //
// nanosecond precision (not TIME_MILLIS or TIMESTAMP_MILLIS described in the spec). // As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond
// timestamp in Impala for some historical reasons, it's not recommended to be used for any
// other types and will probably be deprecated in future Parquet format spec. That's the
// reason why Parquet format spec only defines `TIMESTAMP_MILLIS` and `TIMESTAMP_MICROS` which
// are both logical types annotating `INT64`.
//
// Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive. Starting
// from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store
// a timestamp into a `Long`. This design decision is subject to change though, for example,
// we may resort to microsecond precision in the future.
//
// For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's
// currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using)
// hasn't implemented `TIMESTAMP_MICROS` yet.
//
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
case TimestampType => case TimestampType =>
Types.primitive(INT96, repetition).named(field.name) Types.primitive(INT96, repetition).named(field.name)

View file

@ -17,61 +17,15 @@
package org.apache.spark.sql.parquet package org.apache.spark.sql.parquet
import java.nio.ByteOrder
import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap}
import org.apache.parquet.Preconditions
import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.MessageType
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.parquet.CatalystConverter.FieldType
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
/**
* Collection of converters of Parquet types (group and primitive types) that
* model arrays and maps. The conversions are partly based on the AvroParquet
* converters that are part of Parquet in order to be able to process these
* types.
*
* There are several types of converters:
* <ul>
* <li>[[org.apache.spark.sql.parquet.CatalystPrimitiveConverter]] for primitive
* (numeric, boolean and String) types</li>
* <li>[[org.apache.spark.sql.parquet.CatalystNativeArrayConverter]] for arrays
* of native JVM element types; note: currently null values are not supported!</li>
* <li>[[org.apache.spark.sql.parquet.CatalystArrayConverter]] for arrays of
* arbitrary element types (including nested element types); note: currently
* null values are not supported!</li>
* <li>[[org.apache.spark.sql.parquet.CatalystStructConverter]] for structs</li>
* <li>[[org.apache.spark.sql.parquet.CatalystMapConverter]] for maps; note:
* currently null values are not supported!</li>
* <li>[[org.apache.spark.sql.parquet.CatalystPrimitiveRowConverter]] for rows
* of only primitive element types</li>
* <li>[[org.apache.spark.sql.parquet.CatalystGroupConverter]] for other nested
* records, including the top-level row record</li>
* </ul>
*/
private[sql] object CatalystConverter { private[sql] object CatalystConverter {
// The type internally used for fields
type FieldType = StructField
// This is mostly Parquet convention (see, e.g., `ConversionPatterns`). // This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
// Note that "array" for the array elements is chosen by ParquetAvro. // Note that "array" for the array elements is chosen by ParquetAvro.
// Using a different value will result in Parquet silently dropping columns. // Using a different value will result in Parquet silently dropping columns.
val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag" val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag"
val ARRAY_ELEMENTS_SCHEMA_NAME = "array" val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
// SPARK-4520: Thrift generated parquet files have different array element
// schema names than avro. Thrift parquet uses array_schema_name + "_tuple"
// as opposed to "array" used by default. For more information, check
// TestThriftSchemaConverter.java in parquet.thrift.
val THRIFT_ARRAY_ELEMENTS_SCHEMA_NAME_SUFFIX = "_tuple"
val MAP_KEY_SCHEMA_NAME = "key" val MAP_KEY_SCHEMA_NAME = "key"
val MAP_VALUE_SCHEMA_NAME = "value" val MAP_VALUE_SCHEMA_NAME = "value"
val MAP_SCHEMA_NAME = "map" val MAP_SCHEMA_NAME = "map"
@ -80,787 +34,4 @@ private[sql] object CatalystConverter {
type ArrayScalaType[T] = Seq[T] type ArrayScalaType[T] = Seq[T]
type StructScalaType[T] = InternalRow type StructScalaType[T] = InternalRow
type MapScalaType[K, V] = Map[K, V] type MapScalaType[K, V] = Map[K, V]
protected[parquet] def createConverter(
field: FieldType,
fieldIndex: Int,
parent: CatalystConverter): Converter = {
val fieldType: DataType = field.dataType
fieldType match {
case udt: UserDefinedType[_] => {
createConverter(field.copy(dataType = udt.sqlType), fieldIndex, parent)
}
// For native JVM types we use a converter with native arrays
case ArrayType(elementType: AtomicType, false) => {
new CatalystNativeArrayConverter(elementType, fieldIndex, parent)
}
// This is for other types of arrays, including those with nested fields
case ArrayType(elementType: DataType, false) => {
new CatalystArrayConverter(elementType, fieldIndex, parent)
}
case ArrayType(elementType: DataType, true) => {
new CatalystArrayContainsNullConverter(elementType, fieldIndex, parent)
}
case StructType(fields: Array[StructField]) => {
new CatalystStructConverter(fields, fieldIndex, parent)
}
case MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean) => {
new CatalystMapConverter(
Array(
new FieldType(MAP_KEY_SCHEMA_NAME, keyType, false),
new FieldType(MAP_VALUE_SCHEMA_NAME, valueType, valueContainsNull)),
fieldIndex,
parent)
}
// Strings, Shorts and Bytes do not have a corresponding type in Parquet
// so we need to treat them separately
case StringType =>
new CatalystPrimitiveStringConverter(parent, fieldIndex)
case ShortType => {
new CatalystPrimitiveConverter(parent, fieldIndex) {
override def addInt(value: Int): Unit =
parent.updateShort(fieldIndex, value.asInstanceOf[ShortType.InternalType])
}
}
case ByteType => {
new CatalystPrimitiveConverter(parent, fieldIndex) {
override def addInt(value: Int): Unit =
parent.updateByte(fieldIndex, value.asInstanceOf[ByteType.InternalType])
}
}
case DateType => {
new CatalystPrimitiveConverter(parent, fieldIndex) {
override def addInt(value: Int): Unit =
parent.updateDate(fieldIndex, value.asInstanceOf[DateType.InternalType])
}
}
case d: DecimalType => {
new CatalystPrimitiveConverter(parent, fieldIndex) {
override def addBinary(value: Binary): Unit =
parent.updateDecimal(fieldIndex, value, d)
}
}
case TimestampType => {
new CatalystPrimitiveConverter(parent, fieldIndex) {
override def addBinary(value: Binary): Unit =
parent.updateTimestamp(fieldIndex, value)
}
}
// All other primitive types use the default converter
case ctype: DataType if ParquetTypesConverter.isPrimitiveType(ctype) => {
// note: need the type tag here!
new CatalystPrimitiveConverter(parent, fieldIndex)
}
case _ => throw new RuntimeException(
s"unable to convert datatype ${field.dataType.toString} in CatalystConverter")
}
}
protected[parquet] def createRootConverter(
parquetSchema: MessageType,
attributes: Seq[Attribute]): CatalystConverter = {
// For non-nested types we use the optimized Row converter
if (attributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))) {
new CatalystPrimitiveRowConverter(attributes.toArray)
} else {
new CatalystGroupConverter(attributes.toArray)
}
}
}
private[parquet] abstract class CatalystConverter extends GroupConverter {
/**
* The number of fields this group has
*/
protected[parquet] val size: Int
/**
* The index of this converter in the parent
*/
protected[parquet] val index: Int
/**
* The parent converter
*/
protected[parquet] val parent: CatalystConverter
/**
* Called by child converters to update their value in its parent (this).
* Note that if possible the more specific update methods below should be used
* to avoid auto-boxing of native JVM types.
*
* @param fieldIndex
* @param value
*/
protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit
protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit =
updateField(fieldIndex, value)
protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
updateField(fieldIndex, value)
protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
updateField(fieldIndex, value)
protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
updateField(fieldIndex, value)
protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit =
updateField(fieldIndex, value)
protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit =
updateField(fieldIndex, value)
protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit =
updateField(fieldIndex, value)
protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit =
updateField(fieldIndex, value)
protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit =
updateField(fieldIndex, value.getBytes)
protected[parquet] def updateString(fieldIndex: Int, value: Array[Byte]): Unit =
updateField(fieldIndex, UTF8String.fromBytes(value))
protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
updateField(fieldIndex, readTimestamp(value))
protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit =
updateField(fieldIndex, readDecimal(new Decimal(), value, ctype))
protected[parquet] def isRootConverter: Boolean = parent == null
protected[parquet] def clearBuffer(): Unit
/**
* Should only be called in the root (group) converter!
*
* @return
*/
def getCurrentRecord: InternalRow = throw new UnsupportedOperationException
/**
* Read a decimal value from a Parquet Binary into "dest". Only supports decimals that fit in
* a long (i.e. precision <= 18)
*
* Returned value is needed by CatalystConverter, which doesn't reuse the Decimal object.
*/
protected[parquet] def readDecimal(dest: Decimal, value: Binary, ctype: DecimalType): Decimal = {
val precision = ctype.precisionInfo.get.precision
val scale = ctype.precisionInfo.get.scale
val bytes = value.getBytes
require(bytes.length <= 16, "Decimal field too large to read")
var unscaled = 0L
var i = 0
while (i < bytes.length) {
unscaled = (unscaled << 8) | (bytes(i) & 0xFF)
i += 1
}
// Make sure unscaled has the right sign, by sign-extending the first bit
val numBits = 8 * bytes.length
unscaled = (unscaled << (64 - numBits)) >> (64 - numBits)
dest.set(unscaled, precision, scale)
}
/**
* Read a Timestamp value from a Parquet Int96Value
*/
protected[parquet] def readTimestamp(value: Binary): Long = {
Preconditions.checkArgument(value.length() == 12, "Must be 12 bytes")
val buf = value.toByteBuffer
buf.order(ByteOrder.LITTLE_ENDIAN)
val timeOfDayNanos = buf.getLong
val julianDay = buf.getInt
DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
}
}
/**
* A `parquet.io.api.GroupConverter` that is able to convert a Parquet record
* to a [[org.apache.spark.sql.catalyst.expressions.InternalRow]] object.
*
* @param schema The corresponding Catalyst schema in the form of a list of attributes.
*/
private[parquet] class CatalystGroupConverter(
protected[parquet] val schema: Array[FieldType],
protected[parquet] val index: Int,
protected[parquet] val parent: CatalystConverter,
protected[parquet] var current: ArrayBuffer[Any],
protected[parquet] var buffer: ArrayBuffer[InternalRow])
extends CatalystConverter {
def this(schema: Array[FieldType], index: Int, parent: CatalystConverter) =
this(
schema,
index,
parent,
current = null,
buffer = new ArrayBuffer[InternalRow](
CatalystArrayConverter.INITIAL_ARRAY_SIZE))
/**
* This constructor is used for the root converter only!
*/
def this(attributes: Array[Attribute]) =
this(attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), 0, null)
protected [parquet] val converters: Array[Converter] =
schema.zipWithIndex.map {
case (field, idx) => CatalystConverter.createConverter(field, idx, this)
}.toArray
override val size = schema.size
override def getCurrentRecord: InternalRow = {
assert(isRootConverter, "getCurrentRecord should only be called in root group converter!")
// TODO: use iterators if possible
// Note: this will ever only be called in the root converter when the record has been
// fully processed. Therefore it will be difficult to use mutable rows instead, since
// any non-root converter never would be sure when it would be safe to re-use the buffer.
new GenericInternalRow(current.toArray)
}
override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex)
// for child converters to update upstream values
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
current.update(fieldIndex, value)
}
override protected[parquet] def clearBuffer(): Unit = buffer.clear()
override def start(): Unit = {
current = ArrayBuffer.fill(size)(null)
converters.foreach { converter =>
if (!converter.isPrimitive) {
converter.asInstanceOf[CatalystConverter].clearBuffer()
}
}
}
override def end(): Unit = {
if (!isRootConverter) {
assert(current != null) // there should be no empty groups
buffer.append(new GenericInternalRow(current.toArray))
parent.updateField(index, new GenericInternalRow(buffer.toArray.asInstanceOf[Array[Any]]))
}
}
}
/**
* A `parquet.io.api.GroupConverter` that is able to convert a Parquet record
* to a [[org.apache.spark.sql.catalyst.expressions.InternalRow]] object. Note that his
* converter is optimized for rows of primitive types (non-nested records).
*/
private[parquet] class CatalystPrimitiveRowConverter(
protected[parquet] val schema: Array[FieldType],
protected[parquet] var current: MutableRow)
extends CatalystConverter {
// This constructor is used for the root converter only
def this(attributes: Array[Attribute]) =
this(
attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)),
new SpecificMutableRow(attributes.map(_.dataType)))
protected [parquet] val converters: Array[Converter] =
schema.zipWithIndex.map {
case (field, idx) => CatalystConverter.createConverter(field, idx, this)
}.toArray
override val size = schema.size
override val index = 0
override val parent = null
// Should be only called in root group converter!
override def getCurrentRecord: InternalRow = current
override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex)
// for child converters to update upstream values
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
throw new UnsupportedOperationException // child converters should use the
// specific update methods below
}
override protected[parquet] def clearBuffer(): Unit = {}
override def start(): Unit = {
var i = 0
while (i < size) {
current.setNullAt(i)
i = i + 1
}
}
override def end(): Unit = {}
// Overridden here to avoid auto-boxing for primitive types
override protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit =
current.setBoolean(fieldIndex, value)
override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
current.setInt(fieldIndex, value)
override protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
current.setInt(fieldIndex, value)
override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
current.setLong(fieldIndex, value)
override protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit =
current.setShort(fieldIndex, value)
override protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit =
current.setByte(fieldIndex, value)
override protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit =
current.setDouble(fieldIndex, value)
override protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit =
current.setFloat(fieldIndex, value)
override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit =
current.update(fieldIndex, value.getBytes)
override protected[parquet] def updateString(fieldIndex: Int, value: Array[Byte]): Unit =
current.update(fieldIndex, UTF8String.fromBytes(value))
override protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
current.setLong(fieldIndex, readTimestamp(value))
override protected[parquet] def updateDecimal(
fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
var decimal = current(fieldIndex).asInstanceOf[Decimal]
if (decimal == null) {
decimal = new Decimal
current(fieldIndex) = decimal
}
readDecimal(decimal, value, ctype)
}
}
/**
* A `parquet.io.api.PrimitiveConverter` that converts Parquet types to Catalyst types.
*
* @param parent The parent group converter.
* @param fieldIndex The index inside the record.
*/
private[parquet] class CatalystPrimitiveConverter(
parent: CatalystConverter,
fieldIndex: Int) extends PrimitiveConverter {
override def addBinary(value: Binary): Unit =
parent.updateBinary(fieldIndex, value)
override def addBoolean(value: Boolean): Unit =
parent.updateBoolean(fieldIndex, value)
override def addDouble(value: Double): Unit =
parent.updateDouble(fieldIndex, value)
override def addFloat(value: Float): Unit =
parent.updateFloat(fieldIndex, value)
override def addInt(value: Int): Unit =
parent.updateInt(fieldIndex, value)
override def addLong(value: Long): Unit =
parent.updateLong(fieldIndex, value)
}
/**
* A `parquet.io.api.PrimitiveConverter` that converts Parquet Binary to Catalyst String.
* Supports dictionaries to reduce Binary to String conversion overhead.
*
* Follows pattern in Parquet of using dictionaries, where supported, for String conversion.
*
* @param parent The parent group converter.
* @param fieldIndex The index inside the record.
*/
private[parquet] class CatalystPrimitiveStringConverter(parent: CatalystConverter, fieldIndex: Int)
extends CatalystPrimitiveConverter(parent, fieldIndex) {
private[this] var dict: Array[Array[Byte]] = null
override def hasDictionarySupport: Boolean = true
override def setDictionary(dictionary: Dictionary): Unit =
dict = Array.tabulate(dictionary.getMaxId + 1) { dictionary.decodeToBinary(_).getBytes }
override def addValueFromDictionary(dictionaryId: Int): Unit =
parent.updateString(fieldIndex, dict(dictionaryId))
override def addBinary(value: Binary): Unit =
parent.updateString(fieldIndex, value.getBytes)
}
private[parquet] object CatalystArrayConverter {
val INITIAL_ARRAY_SIZE = 20
}
/**
* A `parquet.io.api.GroupConverter` that converts a single-element groups that
* match the characteristics of an array (see
* [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an
* [[org.apache.spark.sql.types.ArrayType]].
*
* @param elementType The type of the array elements (complex or primitive)
* @param index The position of this (array) field inside its parent converter
* @param parent The parent converter
* @param buffer A data buffer
*/
private[parquet] class CatalystArrayConverter(
val elementType: DataType,
val index: Int,
protected[parquet] val parent: CatalystConverter,
protected[parquet] var buffer: Buffer[Any])
extends CatalystConverter {
def this(elementType: DataType, index: Int, parent: CatalystConverter) =
this(
elementType,
index,
parent,
new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE))
protected[parquet] val converter: Converter = CatalystConverter.createConverter(
new CatalystConverter.FieldType(
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
elementType,
false),
fieldIndex = 0,
parent = this)
override def getConverter(fieldIndex: Int): Converter = converter
// arrays have only one (repeated) field, which is its elements
override val size = 1
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
// fieldIndex is ignored (assumed to be zero but not checked)
if (value == null) {
throw new IllegalArgumentException("Null values inside Parquet arrays are not supported!")
}
buffer += value
}
override protected[parquet] def clearBuffer(): Unit = {
buffer.clear()
}
override def start(): Unit = {
if (!converter.isPrimitive) {
converter.asInstanceOf[CatalystConverter].clearBuffer()
}
}
override def end(): Unit = {
assert(parent != null)
// here we need to make sure to use ArrayScalaType
parent.updateField(index, buffer.toArray.toSeq)
clearBuffer()
}
}
/**
* A `parquet.io.api.GroupConverter` that converts a single-element groups that
* match the characteristics of an array (see
* [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an
* [[org.apache.spark.sql.types.ArrayType]].
*
* @param elementType The type of the array elements (native)
* @param index The position of this (array) field inside its parent converter
* @param parent The parent converter
* @param capacity The (initial) capacity of the buffer
*/
private[parquet] class CatalystNativeArrayConverter(
val elementType: AtomicType,
val index: Int,
protected[parquet] val parent: CatalystConverter,
protected[parquet] var capacity: Int = CatalystArrayConverter.INITIAL_ARRAY_SIZE)
extends CatalystConverter {
type NativeType = elementType.InternalType
private var buffer: Array[NativeType] = elementType.classTag.newArray(capacity)
private var elements: Int = 0
protected[parquet] val converter: Converter = CatalystConverter.createConverter(
new CatalystConverter.FieldType(
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
elementType,
false),
fieldIndex = 0,
parent = this)
override def getConverter(fieldIndex: Int): Converter = converter
// arrays have only one (repeated) field, which is its elements
override val size = 1
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit =
throw new UnsupportedOperationException
// Overridden here to avoid auto-boxing for primitive types
override protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit = {
checkGrowBuffer()
buffer(elements) = value.asInstanceOf[NativeType]
elements += 1
}
override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = {
checkGrowBuffer()
buffer(elements) = value.asInstanceOf[NativeType]
elements += 1
}
override protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit = {
checkGrowBuffer()
buffer(elements) = value.asInstanceOf[NativeType]
elements += 1
}
override protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit = {
checkGrowBuffer()
buffer(elements) = value.asInstanceOf[NativeType]
elements += 1
}
override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = {
checkGrowBuffer()
buffer(elements) = value.asInstanceOf[NativeType]
elements += 1
}
override protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit = {
checkGrowBuffer()
buffer(elements) = value.asInstanceOf[NativeType]
elements += 1
}
override protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit = {
checkGrowBuffer()
buffer(elements) = value.asInstanceOf[NativeType]
elements += 1
}
override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit = {
checkGrowBuffer()
buffer(elements) = value.getBytes.asInstanceOf[NativeType]
elements += 1
}
override protected[parquet] def updateString(fieldIndex: Int, value: Array[Byte]): Unit = {
checkGrowBuffer()
buffer(elements) = UTF8String.fromBytes(value).asInstanceOf[NativeType]
elements += 1
}
override protected[parquet] def clearBuffer(): Unit = {
elements = 0
}
override def start(): Unit = {}
override def end(): Unit = {
assert(parent != null)
// here we need to make sure to use ArrayScalaType
parent.updateField(
index,
buffer.slice(0, elements).toSeq)
clearBuffer()
}
private def checkGrowBuffer(): Unit = {
if (elements >= capacity) {
val newCapacity = 2 * capacity
val tmp: Array[NativeType] = elementType.classTag.newArray(newCapacity)
Array.copy(buffer, 0, tmp, 0, capacity)
buffer = tmp
capacity = newCapacity
}
}
}
/**
* A `parquet.io.api.GroupConverter` that converts a single-element groups that
* match the characteristics of an array contains null (see
* [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an
* [[org.apache.spark.sql.types.ArrayType]].
*
* @param elementType The type of the array elements (complex or primitive)
* @param index The position of this (array) field inside its parent converter
* @param parent The parent converter
* @param buffer A data buffer
*/
private[parquet] class CatalystArrayContainsNullConverter(
val elementType: DataType,
val index: Int,
protected[parquet] val parent: CatalystConverter,
protected[parquet] var buffer: Buffer[Any])
extends CatalystConverter {
def this(elementType: DataType, index: Int, parent: CatalystConverter) =
this(
elementType,
index,
parent,
new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE))
protected[parquet] val converter: Converter = new CatalystConverter {
private var current: Any = null
val converter = CatalystConverter.createConverter(
new CatalystConverter.FieldType(
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
elementType,
false),
fieldIndex = 0,
parent = this)
override def getConverter(fieldIndex: Int): Converter = converter
override def end(): Unit = parent.updateField(index, current)
override def start(): Unit = {
current = null
}
override protected[parquet] val size: Int = 1
override protected[parquet] val index: Int = 0
override protected[parquet] val parent = CatalystArrayContainsNullConverter.this
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
current = value
}
override protected[parquet] def clearBuffer(): Unit = {}
}
override def getConverter(fieldIndex: Int): Converter = converter
// arrays have only one (repeated) field, which is its elements
override val size = 1
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
buffer += value
}
override protected[parquet] def clearBuffer(): Unit = {
buffer.clear()
}
override def start(): Unit = {}
override def end(): Unit = {
assert(parent != null)
// here we need to make sure to use ArrayScalaType
parent.updateField(index, buffer.toArray.toSeq)
clearBuffer()
}
}
/**
* This converter is for multi-element groups of primitive or complex types
* that have repetition level optional or required (so struct fields).
*
* @param schema The corresponding Catalyst schema in the form of a list of
* attributes.
* @param index
* @param parent
*/
private[parquet] class CatalystStructConverter(
override protected[parquet] val schema: Array[FieldType],
override protected[parquet] val index: Int,
override protected[parquet] val parent: CatalystConverter)
extends CatalystGroupConverter(schema, index, parent) {
override protected[parquet] def clearBuffer(): Unit = {}
// TODO: think about reusing the buffer
override def end(): Unit = {
assert(!isRootConverter)
// here we need to make sure to use StructScalaType
// Note: we need to actually make a copy of the array since we
// may be in a nested field
parent.updateField(index, new GenericInternalRow(current.toArray))
}
}
/**
* A `parquet.io.api.GroupConverter` that converts two-element groups that
* match the characteristics of a map (see
* [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an
* [[org.apache.spark.sql.types.MapType]].
*
* @param schema
* @param index
* @param parent
*/
private[parquet] class CatalystMapConverter(
protected[parquet] val schema: Array[FieldType],
override protected[parquet] val index: Int,
override protected[parquet] val parent: CatalystConverter)
extends CatalystConverter {
private val map = new HashMap[Any, Any]()
private val keyValueConverter = new CatalystConverter {
private var currentKey: Any = null
private var currentValue: Any = null
val keyConverter = CatalystConverter.createConverter(schema(0), 0, this)
val valueConverter = CatalystConverter.createConverter(schema(1), 1, this)
override def getConverter(fieldIndex: Int): Converter = {
if (fieldIndex == 0) keyConverter else valueConverter
}
override def end(): Unit = CatalystMapConverter.this.map += currentKey -> currentValue
override def start(): Unit = {
currentKey = null
currentValue = null
}
override protected[parquet] val size: Int = 2
override protected[parquet] val index: Int = 0
override protected[parquet] val parent: CatalystConverter = CatalystMapConverter.this
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
fieldIndex match {
case 0 =>
currentKey = value
case 1 =>
currentValue = value
case _ =>
new RuntimePermission(s"trying to update Map with fieldIndex $fieldIndex")
}
}
override protected[parquet] def clearBuffer(): Unit = {}
}
override protected[parquet] val size: Int = 1
override protected[parquet] def clearBuffer(): Unit = {}
override def start(): Unit = {
map.clear()
}
override def end(): Unit = {
// here we need to make sure to use MapScalaType
parent.updateField(index, map.toMap)
}
override def getConverter(fieldIndex: Int): Converter = keyValueConverter
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit =
throw new UnsupportedOperationException
} }

View file

@ -17,14 +17,17 @@
package org.apache.spark.sql.parquet package org.apache.spark.sql.parquet
import java.nio.{ByteOrder, ByteBuffer} import java.nio.{ByteBuffer, ByteOrder}
import java.util
import java.util.{HashMap => JHashMap} import java.util.{HashMap => JHashMap}
import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.parquet.column.ParquetProperties import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
import org.apache.parquet.hadoop.api.{ReadSupport, WriteSupport} import org.apache.parquet.hadoop.api.{InitContext, ReadSupport, WriteSupport}
import org.apache.parquet.io.api._ import org.apache.parquet.io.api._
import org.apache.parquet.schema.MessageType import org.apache.parquet.schema.MessageType
@ -36,87 +39,133 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.unsafe.types.UTF8String
/** /**
* A `parquet.io.api.RecordMaterializer` for Rows. * A [[RecordMaterializer]] for Catalyst rows.
* *
*@param root The root group converter for the record. * @param parquetSchema Parquet schema of the records to be read
* @param catalystSchema Catalyst schema of the rows to be constructed
*/ */
private[parquet] class RowRecordMaterializer(root: CatalystConverter) private[parquet] class RowRecordMaterializer(parquetSchema: MessageType, catalystSchema: StructType)
extends RecordMaterializer[InternalRow] { extends RecordMaterializer[InternalRow] {
def this(parquetSchema: MessageType, attributes: Seq[Attribute]) = private val rootConverter = new CatalystRowConverter(parquetSchema, catalystSchema, NoopUpdater)
this(CatalystConverter.createRootConverter(parquetSchema, attributes))
override def getCurrentRecord: InternalRow = root.getCurrentRecord override def getCurrentRecord: InternalRow = rootConverter.currentRow
override def getRootConverter: GroupConverter = root.asInstanceOf[GroupConverter] override def getRootConverter: GroupConverter = rootConverter
} }
/**
* A `parquet.hadoop.api.ReadSupport` for Row objects.
*/
private[parquet] class RowReadSupport extends ReadSupport[InternalRow] with Logging { private[parquet] class RowReadSupport extends ReadSupport[InternalRow] with Logging {
override def prepareForRead( override def prepareForRead(
conf: Configuration, conf: Configuration,
stringMap: java.util.Map[String, String], keyValueMetaData: util.Map[String, String],
fileSchema: MessageType, fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[InternalRow] = { readContext: ReadContext): RecordMaterializer[InternalRow] = {
log.debug(s"preparing for read with Parquet file schema $fileSchema") log.debug(s"Preparing for read Parquet file with message type: $fileSchema")
// Note: this very much imitates AvroParquet
val parquetSchema = readContext.getRequestedSchema
var schema: Seq[Attribute] = null
if (readContext.getReadSupportMetadata != null) { val toCatalyst = new CatalystSchemaConverter(conf)
// first try to find the read schema inside the metadata (can result from projections) val parquetRequestedSchema = readContext.getRequestedSchema
if (
readContext val catalystRequestedSchema =
.getReadSupportMetadata Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { metadata =>
.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA) != null) { metadata
schema = ParquetTypesConverter.convertFromString( // First tries to read requested schema, which may result from projections
readContext.getReadSupportMetadata.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)) .get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
} else { // If not available, tries to read Catalyst schema from file metadata. It's only
// if unavailable, try the schema that was read originally from the file or provided // available if the target file is written by Spark SQL.
// during the creation of the Parquet relation .orElse(metadata.get(RowReadSupport.SPARK_METADATA_KEY))
if (readContext.getReadSupportMetadata.get(RowReadSupport.SPARK_METADATA_KEY) != null) { }.map(StructType.fromString).getOrElse {
schema = ParquetTypesConverter.convertFromString( logDebug("Catalyst schema not available, falling back to Parquet schema")
readContext.getReadSupportMetadata.get(RowReadSupport.SPARK_METADATA_KEY)) toCatalyst.convert(parquetRequestedSchema)
}
} }
}
// if both unavailable, fall back to deducing the schema from the given Parquet schema logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema")
// TODO: Why it can be null? new RowRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
if (schema == null) {
log.debug("falling back to Parquet read schema")
schema = ParquetTypesConverter.convertToAttributes(parquetSchema, false, true)
}
log.debug(s"list of attributes that will be read: $schema")
new RowRecordMaterializer(parquetSchema, schema)
} }
override def init( override def init(context: InitContext): ReadContext = {
configuration: Configuration, val conf = context.getConfiguration
keyValueMetaData: java.util.Map[String, String],
fileSchema: MessageType): ReadContext = {
var parquetSchema = fileSchema
val metadata = new JHashMap[String, String]()
val requestedAttributes = RowReadSupport.getRequestedSchema(configuration)
if (requestedAttributes != null) { // If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
// If the parquet file is thrift derived, there is a good chance that // schema of this file from its the metadata.
// it will have the thrift class in metadata. val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
val isThriftDerived = keyValueMetaData.keySet().contains("thrift.class")
parquetSchema = ParquetTypesConverter.convertFromAttributes(requestedAttributes)
metadata.put(
RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
ParquetTypesConverter.convertToString(requestedAttributes))
}
val origAttributesStr: String = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA) // Optional schema of requested columns, in the form of a string serialized from a Catalyst
if (origAttributesStr != null) { // `StructType` containing all requested columns.
metadata.put(RowReadSupport.SPARK_METADATA_KEY, origAttributesStr) val maybeRequestedSchema = Option(conf.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
}
new ReadSupport.ReadContext(parquetSchema, metadata) // Below we construct a Parquet schema containing all requested columns. This schema tells
// Parquet which columns to read.
//
// If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema. Otherwise,
// we have to fallback to the full file schema which contains all columns in the file.
// Obviously this may waste IO bandwidth since it may read more columns than requested.
//
// Two things to note:
//
// 1. It's possible that some requested columns don't exist in the target Parquet file. For
// example, in the case of schema merging, the globally merged schema may contain extra
// columns gathered from other Parquet files. These columns will be simply filled with nulls
// when actually reading the target Parquet file.
//
// 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to
// Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to
// non-standard behaviors of some Parquet libraries/tools. For example, a Parquet file
// containing a single integer array field `f1` may have the following legacy 2-level
// structure:
//
// message root {
// optional group f1 (LIST) {
// required INT32 element;
// }
// }
//
// while `CatalystSchemaConverter` may generate a standard 3-level structure:
//
// message root {
// optional group f1 (LIST) {
// repeated group list {
// required INT32 element;
// }
// }
// }
//
// Apparently, we can't use the 2nd schema to read the target Parquet file as they have
// different physical structures.
val parquetRequestedSchema =
maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
val toParquet = new CatalystSchemaConverter(conf)
val fileSchema = context.getFileSchema.asGroupType()
val fileFieldNames = fileSchema.getFields.map(_.getName).toSet
StructType
// Deserializes the Catalyst schema of requested columns
.fromString(schemaString)
.map { field =>
if (fileFieldNames.contains(field.name)) {
// If the field exists in the target Parquet file, extracts the field type from the
// full file schema and makes a single-field Parquet schema
new MessageType("root", fileSchema.getType(field.name))
} else {
// Otherwise, just resorts to `CatalystSchemaConverter`
toParquet.convert(StructType(Array(field)))
}
}
// Merges all single-field Parquet schemas to form a complete schema for all requested
// columns. Note that it's possible that no columns are requested at all (e.g., count
// some partition column of a partitioned Parquet table). That's why `fold` is used here
// and always fallback to an empty Parquet schema.
.fold(new MessageType("root")) {
_ union _
}
}
val metadata =
Map.empty[String, String] ++
maybeRequestedSchema.map(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema")
new ReadContext(parquetRequestedSchema, metadata)
} }
} }

View file

@ -259,6 +259,10 @@ private[sql] class ParquetRelation2(
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = { broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA) val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
// Create the function to set variable Parquet confs at both driver and executor side. // Create the function to set variable Parquet confs at both driver and executor side.
val initLocalJobFuncOpt = val initLocalJobFuncOpt =
ParquetRelation2.initializeLocalJobFunc( ParquetRelation2.initializeLocalJobFunc(
@ -266,7 +270,11 @@ private[sql] class ParquetRelation2(
filters, filters,
dataSchema, dataSchema,
useMetadataCache, useMetadataCache,
parquetFilterPushDown) _ parquetFilterPushDown,
assumeBinaryIsString,
assumeInt96IsTimestamp,
followParquetFormatSpec) _
// Create the function to set input paths at the driver side. // Create the function to set input paths at the driver side.
val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _ val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _
@ -471,9 +479,12 @@ private[sql] object ParquetRelation2 extends Logging {
filters: Array[Filter], filters: Array[Filter],
dataSchema: StructType, dataSchema: StructType,
useMetadataCache: Boolean, useMetadataCache: Boolean,
parquetFilterPushDown: Boolean)(job: Job): Unit = { parquetFilterPushDown: Boolean,
assumeBinaryIsString: Boolean,
assumeInt96IsTimestamp: Boolean,
followParquetFormatSpec: Boolean)(job: Job): Unit = {
val conf = job.getConfiguration val conf = job.getConfiguration
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[RowReadSupport].getName()) conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[RowReadSupport].getName)
// Try to push down filters when filter push-down is enabled. // Try to push down filters when filter push-down is enabled.
if (parquetFilterPushDown) { if (parquetFilterPushDown) {
@ -497,6 +508,11 @@ private[sql] object ParquetRelation2 extends Logging {
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache) conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
// Sets flags for Parquet schema conversion
conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
conf.setBoolean(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, followParquetFormatSpec)
} }
/** This closure sets input paths at the driver side. */ /** This closure sets input paths at the driver side. */

View file

@ -0,0 +1,33 @@
# Notes for Parquet compatibility tests
The following directories and files are used for Parquet compatibility tests:
```
.
├── README.md # This file
├── avro
│   ├── parquet-compat.avdl # Testing Avro IDL
│   └── parquet-compat.avpr # !! NO TOUCH !! Protocol file generated from parquet-compat.avdl
├── gen-java # !! NO TOUCH !! Generated Java code
├── scripts
│   └── gen-code.sh # Script used to generate Java code for Thrift and Avro
└── thrift
└── parquet-compat.thrift # Testing Thrift schema
```
Generated Java code are used in the following test suites:
- `org.apache.spark.sql.parquet.ParquetAvroCompatibilitySuite`
- `org.apache.spark.sql.parquet.ParquetThriftCompatibilitySuite`
To avoid code generation during build time, Java code generated from testing Thrift schema and Avro IDL are also checked in.
When updating the testing Thrift schema and Avro IDL, please run `gen-code.sh` to update all the generated Java code.
## Prerequisites
Please ensure `avro-tools` and `thrift` are installed. You may install these two on Mac OS X via:
```bash
$ brew install thrift avro-tools
```

View file

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// This is a test protocol for testing parquet-avro compatibility.
@namespace("org.apache.spark.sql.parquet.test.avro")
protocol CompatibilityTest {
record Nested {
array<int> nested_ints_column;
string nested_string_column;
}
record ParquetAvroCompat {
boolean bool_column;
int int_column;
long long_column;
float float_column;
double double_column;
bytes binary_column;
string string_column;
union { null, boolean } maybe_bool_column;
union { null, int } maybe_int_column;
union { null, long } maybe_long_column;
union { null, float } maybe_float_column;
union { null, double } maybe_double_column;
union { null, bytes } maybe_binary_column;
union { null, string } maybe_string_column;
array<string> strings_column;
map<int> string_to_int_column;
map<array<Nested>> complex_column;
}
}

View file

@ -0,0 +1,86 @@
{
"protocol" : "CompatibilityTest",
"namespace" : "org.apache.spark.sql.parquet.test.avro",
"types" : [ {
"type" : "record",
"name" : "Nested",
"fields" : [ {
"name" : "nested_ints_column",
"type" : {
"type" : "array",
"items" : "int"
}
}, {
"name" : "nested_string_column",
"type" : "string"
} ]
}, {
"type" : "record",
"name" : "ParquetAvroCompat",
"fields" : [ {
"name" : "bool_column",
"type" : "boolean"
}, {
"name" : "int_column",
"type" : "int"
}, {
"name" : "long_column",
"type" : "long"
}, {
"name" : "float_column",
"type" : "float"
}, {
"name" : "double_column",
"type" : "double"
}, {
"name" : "binary_column",
"type" : "bytes"
}, {
"name" : "string_column",
"type" : "string"
}, {
"name" : "maybe_bool_column",
"type" : [ "null", "boolean" ]
}, {
"name" : "maybe_int_column",
"type" : [ "null", "int" ]
}, {
"name" : "maybe_long_column",
"type" : [ "null", "long" ]
}, {
"name" : "maybe_float_column",
"type" : [ "null", "float" ]
}, {
"name" : "maybe_double_column",
"type" : [ "null", "double" ]
}, {
"name" : "maybe_binary_column",
"type" : [ "null", "bytes" ]
}, {
"name" : "maybe_string_column",
"type" : [ "null", "string" ]
}, {
"name" : "strings_column",
"type" : {
"type" : "array",
"items" : "string"
}
}, {
"name" : "string_to_int_column",
"type" : {
"type" : "map",
"values" : "int"
}
}, {
"name" : "complex_column",
"type" : {
"type" : "map",
"values" : {
"type" : "array",
"items" : "Nested"
}
}
} ]
} ],
"messages" : { }
}

View file

@ -0,0 +1,17 @@
/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
package org.apache.spark.sql.parquet.test.avro;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public interface CompatibilityTest {
public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"CompatibilityTest\",\"namespace\":\"org.apache.spark.sql.parquet.test.avro\",\"types\":[{\"type\":\"record\",\"name\":\"Nested\",\"fields\":[{\"name\":\"nested_ints_column\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"nested_string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]},{\"type\":\"record\",\"name\":\"ParquetAvroCompat\",\"fields\":[{\"name\":\"bool_column\",\"type\":\"boolean\"},{\"name\":\"int_column\",\"type\":\"int\"},{\"name\":\"long_column\",\"type\":\"long\"},{\"name\":\"float_column\",\"type\":\"float\"},{\"name\":\"double_column\",\"type\":\"double\"},{\"name\":\"binary_column\",\"type\":\"bytes\"},{\"name\":\"string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"maybe_bool_column\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"maybe_int_column\",\"type\":[\"null\",\"int\"]},{\"name\":\"maybe_long_column\",\"type\":[\"null\",\"long\"]},{\"name\":\"maybe_float_column\",\"type\":[\"null\",\"float\"]},{\"name\":\"maybe_double_column\",\"type\":[\"null\",\"double\"]},{\"name\":\"maybe_binary_column\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"maybe_string_column\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"strings_column\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}},{\"name\":\"string_to_int_column\",\"type\":{\"type\":\"map\",\"values\":\"int\",\"avro.java.string\":\"String\"}},{\"name\":\"complex_column\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"array\",\"items\":\"Nested\"},\"avro.java.string\":\"String\"}}]}],\"messages\":{}}");
@SuppressWarnings("all")
public interface Callback extends CompatibilityTest {
public static final org.apache.avro.Protocol PROTOCOL = org.apache.spark.sql.parquet.test.avro.CompatibilityTest.PROTOCOL;
}
}

View file

@ -0,0 +1,196 @@
/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
package org.apache.spark.sql.parquet.test.avro;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Nested extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Nested\",\"namespace\":\"org.apache.spark.sql.parquet.test.avro\",\"fields\":[{\"name\":\"nested_ints_column\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"nested_string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
@Deprecated public java.util.List<java.lang.Integer> nested_ints_column;
@Deprecated public java.lang.String nested_string_column;
/**
* Default constructor. Note that this does not initialize fields
* to their default values from the schema. If that is desired then
* one should use <code>newBuilder()</code>.
*/
public Nested() {}
/**
* All-args constructor.
*/
public Nested(java.util.List<java.lang.Integer> nested_ints_column, java.lang.String nested_string_column) {
this.nested_ints_column = nested_ints_column;
this.nested_string_column = nested_string_column;
}
public org.apache.avro.Schema getSchema() { return SCHEMA$; }
// Used by DatumWriter. Applications should not call.
public java.lang.Object get(int field$) {
switch (field$) {
case 0: return nested_ints_column;
case 1: return nested_string_column;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
// Used by DatumReader. Applications should not call.
@SuppressWarnings(value="unchecked")
public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0: nested_ints_column = (java.util.List<java.lang.Integer>)value$; break;
case 1: nested_string_column = (java.lang.String)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
/**
* Gets the value of the 'nested_ints_column' field.
*/
public java.util.List<java.lang.Integer> getNestedIntsColumn() {
return nested_ints_column;
}
/**
* Sets the value of the 'nested_ints_column' field.
* @param value the value to set.
*/
public void setNestedIntsColumn(java.util.List<java.lang.Integer> value) {
this.nested_ints_column = value;
}
/**
* Gets the value of the 'nested_string_column' field.
*/
public java.lang.String getNestedStringColumn() {
return nested_string_column;
}
/**
* Sets the value of the 'nested_string_column' field.
* @param value the value to set.
*/
public void setNestedStringColumn(java.lang.String value) {
this.nested_string_column = value;
}
/** Creates a new Nested RecordBuilder */
public static org.apache.spark.sql.parquet.test.avro.Nested.Builder newBuilder() {
return new org.apache.spark.sql.parquet.test.avro.Nested.Builder();
}
/** Creates a new Nested RecordBuilder by copying an existing Builder */
public static org.apache.spark.sql.parquet.test.avro.Nested.Builder newBuilder(org.apache.spark.sql.parquet.test.avro.Nested.Builder other) {
return new org.apache.spark.sql.parquet.test.avro.Nested.Builder(other);
}
/** Creates a new Nested RecordBuilder by copying an existing Nested instance */
public static org.apache.spark.sql.parquet.test.avro.Nested.Builder newBuilder(org.apache.spark.sql.parquet.test.avro.Nested other) {
return new org.apache.spark.sql.parquet.test.avro.Nested.Builder(other);
}
/**
* RecordBuilder for Nested instances.
*/
public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Nested>
implements org.apache.avro.data.RecordBuilder<Nested> {
private java.util.List<java.lang.Integer> nested_ints_column;
private java.lang.String nested_string_column;
/** Creates a new Builder */
private Builder() {
super(org.apache.spark.sql.parquet.test.avro.Nested.SCHEMA$);
}
/** Creates a Builder by copying an existing Builder */
private Builder(org.apache.spark.sql.parquet.test.avro.Nested.Builder other) {
super(other);
if (isValidValue(fields()[0], other.nested_ints_column)) {
this.nested_ints_column = data().deepCopy(fields()[0].schema(), other.nested_ints_column);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.nested_string_column)) {
this.nested_string_column = data().deepCopy(fields()[1].schema(), other.nested_string_column);
fieldSetFlags()[1] = true;
}
}
/** Creates a Builder by copying an existing Nested instance */
private Builder(org.apache.spark.sql.parquet.test.avro.Nested other) {
super(org.apache.spark.sql.parquet.test.avro.Nested.SCHEMA$);
if (isValidValue(fields()[0], other.nested_ints_column)) {
this.nested_ints_column = data().deepCopy(fields()[0].schema(), other.nested_ints_column);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.nested_string_column)) {
this.nested_string_column = data().deepCopy(fields()[1].schema(), other.nested_string_column);
fieldSetFlags()[1] = true;
}
}
/** Gets the value of the 'nested_ints_column' field */
public java.util.List<java.lang.Integer> getNestedIntsColumn() {
return nested_ints_column;
}
/** Sets the value of the 'nested_ints_column' field */
public org.apache.spark.sql.parquet.test.avro.Nested.Builder setNestedIntsColumn(java.util.List<java.lang.Integer> value) {
validate(fields()[0], value);
this.nested_ints_column = value;
fieldSetFlags()[0] = true;
return this;
}
/** Checks whether the 'nested_ints_column' field has been set */
public boolean hasNestedIntsColumn() {
return fieldSetFlags()[0];
}
/** Clears the value of the 'nested_ints_column' field */
public org.apache.spark.sql.parquet.test.avro.Nested.Builder clearNestedIntsColumn() {
nested_ints_column = null;
fieldSetFlags()[0] = false;
return this;
}
/** Gets the value of the 'nested_string_column' field */
public java.lang.String getNestedStringColumn() {
return nested_string_column;
}
/** Sets the value of the 'nested_string_column' field */
public org.apache.spark.sql.parquet.test.avro.Nested.Builder setNestedStringColumn(java.lang.String value) {
validate(fields()[1], value);
this.nested_string_column = value;
fieldSetFlags()[1] = true;
return this;
}
/** Checks whether the 'nested_string_column' field has been set */
public boolean hasNestedStringColumn() {
return fieldSetFlags()[1];
}
/** Clears the value of the 'nested_string_column' field */
public org.apache.spark.sql.parquet.test.avro.Nested.Builder clearNestedStringColumn() {
nested_string_column = null;
fieldSetFlags()[1] = false;
return this;
}
@Override
public Nested build() {
try {
Nested record = new Nested();
record.nested_ints_column = fieldSetFlags()[0] ? this.nested_ints_column : (java.util.List<java.lang.Integer>) defaultValue(fields()[0]);
record.nested_string_column = fieldSetFlags()[1] ? this.nested_string_column : (java.lang.String) defaultValue(fields()[1]);
return record;
} catch (Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);
}
}
}
}

View file

@ -0,0 +1,541 @@
/**
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
package org.apache.spark.sql.parquet.test.thrift;
import org.apache.thrift.scheme.IScheme;
import org.apache.thrift.scheme.SchemeFactory;
import org.apache.thrift.scheme.StandardScheme;
import org.apache.thrift.scheme.TupleScheme;
import org.apache.thrift.protocol.TTupleProtocol;
import org.apache.thrift.protocol.TProtocolException;
import org.apache.thrift.EncodingUtils;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.server.AbstractNonblockingServer.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.EnumMap;
import java.util.Set;
import java.util.HashSet;
import java.util.EnumSet;
import java.util.Collections;
import java.util.BitSet;
import java.nio.ByteBuffer;
import java.util.Arrays;
import javax.annotation.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-7-7")
public class Nested implements org.apache.thrift.TBase<Nested, Nested._Fields>, java.io.Serializable, Cloneable, Comparable<Nested> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Nested");
private static final org.apache.thrift.protocol.TField NESTED_INTS_COLUMN_FIELD_DESC = new org.apache.thrift.protocol.TField("nestedIntsColumn", org.apache.thrift.protocol.TType.LIST, (short)1);
private static final org.apache.thrift.protocol.TField NESTED_STRING_COLUMN_FIELD_DESC = new org.apache.thrift.protocol.TField("nestedStringColumn", org.apache.thrift.protocol.TType.STRING, (short)2);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
schemes.put(StandardScheme.class, new NestedStandardSchemeFactory());
schemes.put(TupleScheme.class, new NestedTupleSchemeFactory());
}
public List<Integer> nestedIntsColumn; // required
public String nestedStringColumn; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
NESTED_INTS_COLUMN((short)1, "nestedIntsColumn"),
NESTED_STRING_COLUMN((short)2, "nestedStringColumn");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
static {
for (_Fields field : EnumSet.allOf(_Fields.class)) {
byName.put(field.getFieldName(), field);
}
}
/**
* Find the _Fields constant that matches fieldId, or null if its not found.
*/
public static _Fields findByThriftId(int fieldId) {
switch(fieldId) {
case 1: // NESTED_INTS_COLUMN
return NESTED_INTS_COLUMN;
case 2: // NESTED_STRING_COLUMN
return NESTED_STRING_COLUMN;
default:
return null;
}
}
/**
* Find the _Fields constant that matches fieldId, throwing an exception
* if it is not found.
*/
public static _Fields findByThriftIdOrThrow(int fieldId) {
_Fields fields = findByThriftId(fieldId);
if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
return fields;
}
/**
* Find the _Fields constant that matches name, or null if its not found.
*/
public static _Fields findByName(String name) {
return byName.get(name);
}
private final short _thriftId;
private final String _fieldName;
_Fields(short thriftId, String fieldName) {
_thriftId = thriftId;
_fieldName = fieldName;
}
public short getThriftFieldId() {
return _thriftId;
}
public String getFieldName() {
return _fieldName;
}
}
// isset id assignments
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.NESTED_INTS_COLUMN, new org.apache.thrift.meta_data.FieldMetaData("nestedIntsColumn", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))));
tmpMap.put(_Fields.NESTED_STRING_COLUMN, new org.apache.thrift.meta_data.FieldMetaData("nestedStringColumn", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Nested.class, metaDataMap);
}
public Nested() {
}
public Nested(
List<Integer> nestedIntsColumn,
String nestedStringColumn)
{
this();
this.nestedIntsColumn = nestedIntsColumn;
this.nestedStringColumn = nestedStringColumn;
}
/**
* Performs a deep copy on <i>other</i>.
*/
public Nested(Nested other) {
if (other.isSetNestedIntsColumn()) {
List<Integer> __this__nestedIntsColumn = new ArrayList<Integer>(other.nestedIntsColumn);
this.nestedIntsColumn = __this__nestedIntsColumn;
}
if (other.isSetNestedStringColumn()) {
this.nestedStringColumn = other.nestedStringColumn;
}
}
public Nested deepCopy() {
return new Nested(this);
}
@Override
public void clear() {
this.nestedIntsColumn = null;
this.nestedStringColumn = null;
}
public int getNestedIntsColumnSize() {
return (this.nestedIntsColumn == null) ? 0 : this.nestedIntsColumn.size();
}
public java.util.Iterator<Integer> getNestedIntsColumnIterator() {
return (this.nestedIntsColumn == null) ? null : this.nestedIntsColumn.iterator();
}
public void addToNestedIntsColumn(int elem) {
if (this.nestedIntsColumn == null) {
this.nestedIntsColumn = new ArrayList<Integer>();
}
this.nestedIntsColumn.add(elem);
}
public List<Integer> getNestedIntsColumn() {
return this.nestedIntsColumn;
}
public Nested setNestedIntsColumn(List<Integer> nestedIntsColumn) {
this.nestedIntsColumn = nestedIntsColumn;
return this;
}
public void unsetNestedIntsColumn() {
this.nestedIntsColumn = null;
}
/** Returns true if field nestedIntsColumn is set (has been assigned a value) and false otherwise */
public boolean isSetNestedIntsColumn() {
return this.nestedIntsColumn != null;
}
public void setNestedIntsColumnIsSet(boolean value) {
if (!value) {
this.nestedIntsColumn = null;
}
}
public String getNestedStringColumn() {
return this.nestedStringColumn;
}
public Nested setNestedStringColumn(String nestedStringColumn) {
this.nestedStringColumn = nestedStringColumn;
return this;
}
public void unsetNestedStringColumn() {
this.nestedStringColumn = null;
}
/** Returns true if field nestedStringColumn is set (has been assigned a value) and false otherwise */
public boolean isSetNestedStringColumn() {
return this.nestedStringColumn != null;
}
public void setNestedStringColumnIsSet(boolean value) {
if (!value) {
this.nestedStringColumn = null;
}
}
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case NESTED_INTS_COLUMN:
if (value == null) {
unsetNestedIntsColumn();
} else {
setNestedIntsColumn((List<Integer>)value);
}
break;
case NESTED_STRING_COLUMN:
if (value == null) {
unsetNestedStringColumn();
} else {
setNestedStringColumn((String)value);
}
break;
}
}
public Object getFieldValue(_Fields field) {
switch (field) {
case NESTED_INTS_COLUMN:
return getNestedIntsColumn();
case NESTED_STRING_COLUMN:
return getNestedStringColumn();
}
throw new IllegalStateException();
}
/** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
public boolean isSet(_Fields field) {
if (field == null) {
throw new IllegalArgumentException();
}
switch (field) {
case NESTED_INTS_COLUMN:
return isSetNestedIntsColumn();
case NESTED_STRING_COLUMN:
return isSetNestedStringColumn();
}
throw new IllegalStateException();
}
@Override
public boolean equals(Object that) {
if (that == null)
return false;
if (that instanceof Nested)
return this.equals((Nested)that);
return false;
}
public boolean equals(Nested that) {
if (that == null)
return false;
boolean this_present_nestedIntsColumn = true && this.isSetNestedIntsColumn();
boolean that_present_nestedIntsColumn = true && that.isSetNestedIntsColumn();
if (this_present_nestedIntsColumn || that_present_nestedIntsColumn) {
if (!(this_present_nestedIntsColumn && that_present_nestedIntsColumn))
return false;
if (!this.nestedIntsColumn.equals(that.nestedIntsColumn))
return false;
}
boolean this_present_nestedStringColumn = true && this.isSetNestedStringColumn();
boolean that_present_nestedStringColumn = true && that.isSetNestedStringColumn();
if (this_present_nestedStringColumn || that_present_nestedStringColumn) {
if (!(this_present_nestedStringColumn && that_present_nestedStringColumn))
return false;
if (!this.nestedStringColumn.equals(that.nestedStringColumn))
return false;
}
return true;
}
@Override
public int hashCode() {
List<Object> list = new ArrayList<Object>();
boolean present_nestedIntsColumn = true && (isSetNestedIntsColumn());
list.add(present_nestedIntsColumn);
if (present_nestedIntsColumn)
list.add(nestedIntsColumn);
boolean present_nestedStringColumn = true && (isSetNestedStringColumn());
list.add(present_nestedStringColumn);
if (present_nestedStringColumn)
list.add(nestedStringColumn);
return list.hashCode();
}
@Override
public int compareTo(Nested other) {
if (!getClass().equals(other.getClass())) {
return getClass().getName().compareTo(other.getClass().getName());
}
int lastComparison = 0;
lastComparison = Boolean.valueOf(isSetNestedIntsColumn()).compareTo(other.isSetNestedIntsColumn());
if (lastComparison != 0) {
return lastComparison;
}
if (isSetNestedIntsColumn()) {
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.nestedIntsColumn, other.nestedIntsColumn);
if (lastComparison != 0) {
return lastComparison;
}
}
lastComparison = Boolean.valueOf(isSetNestedStringColumn()).compareTo(other.isSetNestedStringColumn());
if (lastComparison != 0) {
return lastComparison;
}
if (isSetNestedStringColumn()) {
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.nestedStringColumn, other.nestedStringColumn);
if (lastComparison != 0) {
return lastComparison;
}
}
return 0;
}
public _Fields fieldForId(int fieldId) {
return _Fields.findByThriftId(fieldId);
}
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
}
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("Nested(");
boolean first = true;
sb.append("nestedIntsColumn:");
if (this.nestedIntsColumn == null) {
sb.append("null");
} else {
sb.append(this.nestedIntsColumn);
}
first = false;
if (!first) sb.append(", ");
sb.append("nestedStringColumn:");
if (this.nestedStringColumn == null) {
sb.append("null");
} else {
sb.append(this.nestedStringColumn);
}
first = false;
sb.append(")");
return sb.toString();
}
public void validate() throws org.apache.thrift.TException {
// check for required fields
if (nestedIntsColumn == null) {
throw new org.apache.thrift.protocol.TProtocolException("Required field 'nestedIntsColumn' was not present! Struct: " + toString());
}
if (nestedStringColumn == null) {
throw new org.apache.thrift.protocol.TProtocolException("Required field 'nestedStringColumn' was not present! Struct: " + toString());
}
// check for sub-struct validity
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
try {
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private static class NestedStandardSchemeFactory implements SchemeFactory {
public NestedStandardScheme getScheme() {
return new NestedStandardScheme();
}
}
private static class NestedStandardScheme extends StandardScheme<Nested> {
public void read(org.apache.thrift.protocol.TProtocol iprot, Nested struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
{
schemeField = iprot.readFieldBegin();
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
break;
}
switch (schemeField.id) {
case 1: // NESTED_INTS_COLUMN
if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
{
org.apache.thrift.protocol.TList _list0 = iprot.readListBegin();
struct.nestedIntsColumn = new ArrayList<Integer>(_list0.size);
int _elem1;
for (int _i2 = 0; _i2 < _list0.size; ++_i2)
{
_elem1 = iprot.readI32();
struct.nestedIntsColumn.add(_elem1);
}
iprot.readListEnd();
}
struct.setNestedIntsColumnIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 2: // NESTED_STRING_COLUMN
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.nestedStringColumn = iprot.readString();
struct.setNestedStringColumnIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
iprot.readFieldEnd();
}
iprot.readStructEnd();
// check for required fields of primitive type, which can't be checked in the validate method
struct.validate();
}
public void write(org.apache.thrift.protocol.TProtocol oprot, Nested struct) throws org.apache.thrift.TException {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
if (struct.nestedIntsColumn != null) {
oprot.writeFieldBegin(NESTED_INTS_COLUMN_FIELD_DESC);
{
oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I32, struct.nestedIntsColumn.size()));
for (int _iter3 : struct.nestedIntsColumn)
{
oprot.writeI32(_iter3);
}
oprot.writeListEnd();
}
oprot.writeFieldEnd();
}
if (struct.nestedStringColumn != null) {
oprot.writeFieldBegin(NESTED_STRING_COLUMN_FIELD_DESC);
oprot.writeString(struct.nestedStringColumn);
oprot.writeFieldEnd();
}
oprot.writeFieldStop();
oprot.writeStructEnd();
}
}
private static class NestedTupleSchemeFactory implements SchemeFactory {
public NestedTupleScheme getScheme() {
return new NestedTupleScheme();
}
}
private static class NestedTupleScheme extends TupleScheme<Nested> {
@Override
public void write(org.apache.thrift.protocol.TProtocol prot, Nested struct) throws org.apache.thrift.TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
{
oprot.writeI32(struct.nestedIntsColumn.size());
for (int _iter4 : struct.nestedIntsColumn)
{
oprot.writeI32(_iter4);
}
}
oprot.writeString(struct.nestedStringColumn);
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, Nested struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
{
org.apache.thrift.protocol.TList _list5 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I32, iprot.readI32());
struct.nestedIntsColumn = new ArrayList<Integer>(_list5.size);
int _elem6;
for (int _i7 = 0; _i7 < _list5.size; ++_i7)
{
_elem6 = iprot.readI32();
struct.nestedIntsColumn.add(_elem6);
}
}
struct.setNestedIntsColumnIsSet(true);
struct.nestedStringColumn = iprot.readString();
struct.setNestedStringColumnIsSet(true);
}
}
}

View file

@ -0,0 +1,51 @@
/**
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
package org.apache.spark.sql.parquet.test.thrift;
import java.util.Map;
import java.util.HashMap;
import org.apache.thrift.TEnum;
public enum Suit implements org.apache.thrift.TEnum {
SPADES(0),
HEARTS(1),
DIAMONDS(2),
CLUBS(3);
private final int value;
private Suit(int value) {
this.value = value;
}
/**
* Get the integer value of this enum value, as defined in the Thrift IDL.
*/
public int getValue() {
return value;
}
/**
* Find a the enum type by its integer value, as defined in the Thrift IDL.
* @return null if the value is not found.
*/
public static Suit findByValue(int value) {
switch (value) {
case 0:
return SPADES;
case 1:
return HEARTS;
case 2:
return DIAMONDS;
case 3:
return CLUBS;
default:
return null;
}
}
}

View file

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.parquet
import java.nio.ByteBuffer
import java.util.{List => JList, Map => JMap}
import scala.collection.JavaConversions._
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.spark.sql.parquet.test.avro.{Nested, ParquetAvroCompat}
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.{Row, SQLContext}
class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest {
import ParquetCompatibilityTest._
override val sqlContext: SQLContext = TestSQLContext
override protected def beforeAll(): Unit = {
super.beforeAll()
val writer =
new AvroParquetWriter[ParquetAvroCompat](
new Path(parquetStore.getCanonicalPath),
ParquetAvroCompat.getClassSchema)
(0 until 10).foreach(i => writer.write(makeParquetAvroCompat(i)))
writer.close()
}
test("Read Parquet file generated by parquet-avro") {
logInfo(
s"""Schema of the Parquet file written by parquet-avro:
|${readParquetSchema(parquetStore.getCanonicalPath)}
""".stripMargin)
checkAnswer(sqlContext.read.parquet(parquetStore.getCanonicalPath), (0 until 10).map { i =>
def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i)
Row(
i % 2 == 0,
i,
i.toLong * 10,
i.toFloat + 0.1f,
i.toDouble + 0.2d,
s"val_$i".getBytes,
s"val_$i",
nullable(i % 2 == 0: java.lang.Boolean),
nullable(i: Integer),
nullable(i.toLong: java.lang.Long),
nullable(i.toFloat + 0.1f: java.lang.Float),
nullable(i.toDouble + 0.2d: java.lang.Double),
nullable(s"val_$i".getBytes),
nullable(s"val_$i"),
Seq.tabulate(3)(n => s"arr_${i + n}"),
Seq.tabulate(3)(n => n.toString -> (i + n: Integer)).toMap,
Seq.tabulate(3) { n =>
(i + n).toString -> Seq.tabulate(3) { m =>
Row(Seq.tabulate(3)(j => i + j + m), s"val_${i + m}")
}
}.toMap)
})
}
def makeParquetAvroCompat(i: Int): ParquetAvroCompat = {
def nullable[T <: AnyRef] = makeNullable[T](i) _
def makeComplexColumn(i: Int): JMap[String, JList[Nested]] = {
mapAsJavaMap(Seq.tabulate(3) { n =>
(i + n).toString -> seqAsJavaList(Seq.tabulate(3) { m =>
Nested
.newBuilder()
.setNestedIntsColumn(seqAsJavaList(Seq.tabulate(3)(j => i + j + m)))
.setNestedStringColumn(s"val_${i + m}")
.build()
})
}.toMap)
}
ParquetAvroCompat
.newBuilder()
.setBoolColumn(i % 2 == 0)
.setIntColumn(i)
.setLongColumn(i.toLong * 10)
.setFloatColumn(i.toFloat + 0.1f)
.setDoubleColumn(i.toDouble + 0.2d)
.setBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes))
.setStringColumn(s"val_$i")
.setMaybeBoolColumn(nullable(i % 2 == 0: java.lang.Boolean))
.setMaybeIntColumn(nullable(i: Integer))
.setMaybeLongColumn(nullable(i.toLong: java.lang.Long))
.setMaybeFloatColumn(nullable(i.toFloat + 0.1f: java.lang.Float))
.setMaybeDoubleColumn(nullable(i.toDouble + 0.2d: java.lang.Double))
.setMaybeBinaryColumn(nullable(ByteBuffer.wrap(s"val_$i".getBytes)))
.setMaybeStringColumn(nullable(s"val_$i"))
.setStringsColumn(Seq.tabulate(3)(n => s"arr_${i + n}"))
.setStringToIntColumn(
mapAsJavaMap(Seq.tabulate(3)(n => n.toString -> (i + n: Integer)).toMap))
.setComplexColumn(makeComplexColumn(i))
.build()
}
}

View file

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.parquet
import java.io.File
import scala.collection.JavaConversions._
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.MessageType
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.QueryTest
import org.apache.spark.util.Utils
abstract class ParquetCompatibilityTest extends QueryTest with ParquetTest with BeforeAndAfterAll {
protected var parquetStore: File = _
override protected def beforeAll(): Unit = {
parquetStore = Utils.createTempDir(namePrefix = "parquet-compat_")
parquetStore.delete()
}
override protected def afterAll(): Unit = {
Utils.deleteRecursively(parquetStore)
}
def readParquetSchema(path: String): MessageType = {
val fsPath = new Path(path)
val fs = fsPath.getFileSystem(configuration)
val parquetFiles = fs.listStatus(fsPath).toSeq.filterNot(_.getPath.getName.startsWith("_"))
val footers = ParquetFileReader.readAllFootersInParallel(configuration, parquetFiles, true)
footers.head.getParquetMetadata.getFileMetaData.getSchema
}
}
object ParquetCompatibilityTest {
def makeNullable[T <: AnyRef](i: Int)(f: => T): T = {
if (i % 3 == 0) null.asInstanceOf[T] else f
}
}

View file

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.parquet
import java.nio.ByteBuffer
import java.util.{List => JList, Map => JMap}
import scala.collection.JavaConversions._
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.thrift.ThriftParquetWriter
import org.apache.spark.sql.parquet.test.thrift.{Nested, ParquetThriftCompat, Suit}
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.{Row, SQLContext}
class ParquetThriftCompatibilitySuite extends ParquetCompatibilityTest {
import ParquetCompatibilityTest._
override val sqlContext: SQLContext = TestSQLContext
override protected def beforeAll(): Unit = {
super.beforeAll()
val writer =
new ThriftParquetWriter[ParquetThriftCompat](
new Path(parquetStore.getCanonicalPath),
classOf[ParquetThriftCompat],
CompressionCodecName.SNAPPY)
(0 until 10).foreach(i => writer.write(makeParquetThriftCompat(i)))
writer.close()
}
test("Read Parquet file generated by parquet-thrift") {
logInfo(
s"""Schema of the Parquet file written by parquet-thrift:
|${readParquetSchema(parquetStore.getCanonicalPath)}
""".stripMargin)
checkAnswer(sqlContext.read.parquet(parquetStore.getCanonicalPath), (0 until 10).map { i =>
def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i)
Row(
i % 2 == 0,
i.toByte,
(i + 1).toShort,
i + 2,
i.toLong * 10,
i.toDouble + 0.2d,
// Thrift `BINARY` values are actually unencoded `STRING` values, and thus are always
// treated as `BINARY (UTF8)` in parquet-thrift, since parquet-thrift always assume
// Thrift `STRING`s are encoded using UTF-8.
s"val_$i",
s"val_$i",
// Thrift ENUM values are converted to Parquet binaries containing UTF-8 strings
Suit.values()(i % 4).name(),
nullable(i % 2 == 0: java.lang.Boolean),
nullable(i.toByte: java.lang.Byte),
nullable((i + 1).toShort: java.lang.Short),
nullable(i + 2: Integer),
nullable((i * 10).toLong: java.lang.Long),
nullable(i.toDouble + 0.2d: java.lang.Double),
nullable(s"val_$i"),
nullable(s"val_$i"),
nullable(Suit.values()(i % 4).name()),
Seq.tabulate(3)(n => s"arr_${i + n}"),
// Thrift `SET`s are converted to Parquet `LIST`s
Seq(i),
Seq.tabulate(3)(n => (i + n: Integer) -> s"val_${i + n}").toMap,
Seq.tabulate(3) { n =>
(i + n) -> Seq.tabulate(3) { m =>
Row(Seq.tabulate(3)(j => i + j + m), s"val_${i + m}")
}
}.toMap)
})
}
def makeParquetThriftCompat(i: Int): ParquetThriftCompat = {
def makeComplexColumn(i: Int): JMap[Integer, JList[Nested]] = {
mapAsJavaMap(Seq.tabulate(3) { n =>
(i + n: Integer) -> seqAsJavaList(Seq.tabulate(3) { m =>
new Nested(
seqAsJavaList(Seq.tabulate(3)(j => i + j + m)),
s"val_${i + m}")
})
}.toMap)
}
val value =
new ParquetThriftCompat(
i % 2 == 0,
i.toByte,
(i + 1).toShort,
i + 2,
i.toLong * 10,
i.toDouble + 0.2d,
ByteBuffer.wrap(s"val_$i".getBytes),
s"val_$i",
Suit.values()(i % 4),
seqAsJavaList(Seq.tabulate(3)(n => s"arr_${i + n}")),
setAsJavaSet(Set(i)),
mapAsJavaMap(Seq.tabulate(3)(n => (i + n: Integer) -> s"val_${i + n}").toMap),
makeComplexColumn(i))
if (i % 3 == 0) {
value
} else {
value
.setMaybeBoolColumn(i % 2 == 0)
.setMaybeByteColumn(i.toByte)
.setMaybeShortColumn((i + 1).toShort)
.setMaybeIntColumn(i + 2)
.setMaybeLongColumn(i.toLong * 10)
.setMaybeDoubleColumn(i.toDouble + 0.2d)
.setMaybeBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes))
.setMaybeStringColumn(s"val_$i")
.setMaybeEnumColumn(Suit.values()(i % 4))
}
}
}

View file

@ -0,0 +1,31 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
cd $(dirname $0)/..
BASEDIR=`pwd`
cd -
rm -rf $BASEDIR/gen-java
mkdir -p $BASEDIR/gen-java
thrift\
--gen java\
-out $BASEDIR/gen-java\
$BASEDIR/thrift/parquet-compat.thrift
avro-tools idl $BASEDIR/avro/parquet-compat.avdl > $BASEDIR/avro/parquet-compat.avpr
avro-tools compile -string protocol $BASEDIR/avro/parquet-compat.avpr $BASEDIR/gen-java

View file

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace java org.apache.spark.sql.parquet.test.thrift
enum Suit {
SPADES,
HEARTS,
DIAMONDS,
CLUBS
}
struct Nested {
1: required list<i32> nestedIntsColumn;
2: required string nestedStringColumn;
}
/**
* This is a test struct for testing parquet-thrift compatibility.
*/
struct ParquetThriftCompat {
1: required bool boolColumn;
2: required byte byteColumn;
3: required i16 shortColumn;
4: required i32 intColumn;
5: required i64 longColumn;
6: required double doubleColumn;
7: required binary binaryColumn;
8: required string stringColumn;
9: required Suit enumColumn
10: optional bool maybeBoolColumn;
11: optional byte maybeByteColumn;
12: optional i16 maybeShortColumn;
13: optional i32 maybeIntColumn;
14: optional i64 maybeLongColumn;
15: optional double maybeDoubleColumn;
16: optional binary maybeBinaryColumn;
17: optional string maybeStringColumn;
18: optional Suit maybeEnumColumn;
19: required list<string> stringsColumn;
20: required set<i32> intSetColumn;
21: required map<i32, string> intToStringColumn;
22: required map<i32, list<Nested>> complexColumn;
}

View file

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.parquet.ParquetCompatibilityTest
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
import ParquetCompatibilityTest.makeNullable
override val sqlContext: SQLContext = TestHive
override protected def beforeAll(): Unit = {
super.beforeAll()
withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
withTempTable("data") {
sqlContext.sql(
s"""CREATE TABLE parquet_compat(
| bool_column BOOLEAN,
| byte_column TINYINT,
| short_column SMALLINT,
| int_column INT,
| long_column BIGINT,
| float_column FLOAT,
| double_column DOUBLE,
|
| strings_column ARRAY<STRING>,
| int_to_string_column MAP<INT, STRING>
|)
|STORED AS PARQUET
|LOCATION '${parquetStore.getCanonicalPath}'
""".stripMargin)
val schema = sqlContext.table("parquet_compat").schema
val rowRDD = sqlContext.sparkContext.parallelize(makeRows).coalesce(1)
sqlContext.createDataFrame(rowRDD, schema).registerTempTable("data")
sqlContext.sql("INSERT INTO TABLE parquet_compat SELECT * FROM data")
}
}
}
override protected def afterAll(): Unit = {
sqlContext.sql("DROP TABLE parquet_compat")
}
test("Read Parquet file generated by parquet-hive") {
logInfo(
s"""Schema of the Parquet file written by parquet-hive:
|${readParquetSchema(parquetStore.getCanonicalPath)}
""".stripMargin)
// Unfortunately parquet-hive doesn't add `UTF8` annotation to BINARY when writing strings.
// Have to assume all BINARY values are strings here.
withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") {
checkAnswer(sqlContext.read.parquet(parquetStore.getCanonicalPath), makeRows)
}
}
def makeRows: Seq[Row] = {
(0 until 10).map { i =>
def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i)
Row(
nullable(i % 2 == 0: java.lang.Boolean),
nullable(i.toByte: java.lang.Byte),
nullable((i + 1).toShort: java.lang.Short),
nullable(i + 2: Integer),
nullable(i.toLong * 10: java.lang.Long),
nullable(i.toFloat + 0.1f: java.lang.Float),
nullable(i.toDouble + 0.2d: java.lang.Double),
nullable(Seq.tabulate(3)(n => s"arr_${i + n}")),
nullable(Seq.tabulate(3)(n => (i + n: Integer) -> s"val_${i + n}").toMap))
}
}
}

View file

@ -21,14 +21,16 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql._
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD} import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan} import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation} import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, QueryTest, Row, SQLConf, SaveMode}
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
// The data where the partitioning key exists only in the directory structure. // The data where the partitioning key exists only in the directory structure.
@ -685,6 +687,31 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
sql("drop table spark_6016_fix") sql("drop table spark_6016_fix")
} }
test("SPARK-8811: compatibility with array of struct in Hive") {
withTempPath { dir =>
val path = dir.getCanonicalPath
withTable("array_of_struct") {
val conf = Seq(
HiveContext.CONVERT_METASTORE_PARQUET.key -> "false",
SQLConf.PARQUET_BINARY_AS_STRING.key -> "true",
SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key -> "true")
withSQLConf(conf: _*) {
sql(
s"""CREATE TABLE array_of_struct
|STORED AS PARQUET LOCATION '$path'
|AS SELECT '1st', '2nd', ARRAY(NAMED_STRUCT('a', 'val_a', 'b', 'val_b'))
""".stripMargin)
checkAnswer(
sqlContext.read.parquet(path),
Row("1st", "2nd", Seq(Row("val_a", "val_b"))))
}
}
}
}
} }
class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase { class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
@ -762,7 +789,9 @@ class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {
/** /**
* A collection of tests for parquet data with various forms of partitioning. * A collection of tests for parquet data with various forms of partitioning.
*/ */
abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll { abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with BeforeAndAfterAll {
override def sqlContext: SQLContext = TestHive
var partitionedTableDir: File = null var partitionedTableDir: File = null
var normalTableDir: File = null var normalTableDir: File = null
var partitionedTableDirWithKey: File = null var partitionedTableDirWithKey: File = null