[SPARK-6010] [SQL] Merging compatible Parquet schemas before computing splits

`ReadContext.init` calls `InitContext.getMergedKeyValueMetadata`, which doesn't know how to merge conflicting user defined key-value metadata and throws exception. In our case, when dealing with different but compatible schemas, we have different Spark SQL schema JSON strings in different Parquet part-files, thus causes this problem. Reading similar Parquet files generated by Hive doesn't suffer from this issue.

In this PR, we manually merge the schemas before passing it to `ReadContext` to avoid the exception.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4768)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #4768 from liancheng/spark-6010 and squashes the following commits:

9002f0a [Cheng Lian] Fixes SPARK-6010
This commit is contained in:
Cheng Lian 2015-02-25 15:15:22 -08:00 committed by Michael Armbrust
parent f3f4c87b3d
commit e0fdd467e2
3 changed files with 45 additions and 1 deletions

View file

@ -48,6 +48,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row, _}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.{Logging, SerializableWritable, TaskContext}
/**
@ -459,13 +460,30 @@ private[parquet] class FilteringParquetRowInputFormat
val getGlobalMetaData =
classOf[ParquetFileWriter].getDeclaredMethod("getGlobalMetaData", classOf[JList[Footer]])
getGlobalMetaData.setAccessible(true)
val globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData]
var globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData]
if (globalMetaData == null) {
val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
return splits
}
Option(globalMetaData.getKeyValueMetaData.get(RowReadSupport.SPARK_METADATA_KEY)).foreach {
schemas =>
val mergedSchema = schemas
.map(DataType.fromJson(_).asInstanceOf[StructType])
.reduce(_ merge _)
.json
val mergedMetadata = globalMetaData
.getKeyValueMetaData
.updated(RowReadSupport.SPARK_METADATA_KEY, setAsJavaSet(Set(mergedSchema)))
globalMetaData = new GlobalMetaData(
globalMetaData.getSchema,
mergedMetadata,
globalMetaData.getCreatedBy)
}
val readContext = getReadSupport(configuration).init(
new InitContext(configuration,
globalMetaData.getKeyValueMetaData,

View file

@ -131,6 +131,11 @@ private[sql] trait ParquetTest {
data.toDF().save(path.getCanonicalPath, "org.apache.spark.sql.parquet", SaveMode.Overwrite)
}
protected def makeParquetFile[T <: Product: ClassTag: TypeTag](
df: DataFrame, path: File): Unit = {
df.save(path.getCanonicalPath, "org.apache.spark.sql.parquet", SaveMode.Overwrite)
}
protected def makePartitionDir(
basePath: File,
defaultPartitionName: String,

View file

@ -36,6 +36,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
override val sqlContext: SQLContext = TestSQLContext
import sqlContext._
import sqlContext.implicits._
val defaultPartitionName = "__NULL__"
@ -319,4 +320,24 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
}
}
}
test("read partitioned table - merging compatible schemas") {
withTempDir { base =>
makeParquetFile(
(1 to 10).map(i => Tuple1(i)).toDF("intField"),
makePartitionDir(base, defaultPartitionName, "pi" -> 1))
makeParquetFile(
(1 to 10).map(i => (i, i.toString)).toDF("intField", "stringField"),
makePartitionDir(base, defaultPartitionName, "pi" -> 2))
load(base.getCanonicalPath, "org.apache.spark.sql.parquet").registerTempTable("t")
withTempTable("t") {
checkAnswer(
sql("SELECT * FROM t"),
(1 to 10).map(i => Row(i, null, 1)) ++ (1 to 10).map(i => Row(i, i.toString, 2)))
}
}
}
}