[SPARK-6315] [SQL] Also tries the case class string parser while reading Parquet schema

When writing Parquet files, Spark 1.1.x persists the schema string into Parquet metadata with the result of `StructType.toString`, which was then deprecated in Spark 1.2 by a schema string in JSON format. But we still need to take the old schema format into account while reading Parquet files.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5034)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #5034 from liancheng/spark-6315 and squashes the following commits:

a182f58 [Cheng Lian] Adds a regression test
b9c6dbe [Cheng Lian] Also tries the case class string parser while reading Parquet schema
This commit is contained in:
Cheng Lian 2015-03-21 11:18:45 +08:00
parent bc37c9743e
commit 937c1e5503
2 changed files with 60 additions and 5 deletions

View file

@ -681,7 +681,7 @@ private[sql] case class ParquetRelation2(
}
}
private[sql] object ParquetRelation2 {
private[sql] object ParquetRelation2 extends Logging {
// Whether we should merge schemas collected from all Parquet part-files.
val MERGE_SCHEMA = "mergeSchema"
@ -701,7 +701,26 @@ private[sql] object ParquetRelation2 {
.getKeyValueMetaData
.toMap
.get(RowReadSupport.SPARK_METADATA_KEY)
.map(DataType.fromJson(_).asInstanceOf[StructType])
.flatMap { serializedSchema =>
// Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
// whatever is available.
Try(DataType.fromJson(serializedSchema))
.recover { case _: Throwable =>
logInfo(
s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
"falling back to the deprecated DataType.fromCaseClassString parser.")
DataType.fromCaseClassString(serializedSchema)
}
.recover { case cause: Throwable =>
logWarning(
s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
|\t$serializedSchema
""".stripMargin,
cause)
}
.map(_.asInstanceOf[StructType])
.toOption
}
maybeSparkSchema.getOrElse {
// Falls back to Parquet schema if Spark SQL schema is absent.

View file

@ -28,8 +28,8 @@ import parquet.example.data.simple.SimpleGroup
import parquet.example.data.{Group, GroupWriter}
import parquet.hadoop.api.WriteSupport
import parquet.hadoop.api.WriteSupport.WriteContext
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.{ParquetFileWriter, ParquetWriter}
import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName}
import parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter}
import parquet.io.api.RecordConsumer
import parquet.schema.{MessageType, MessageTypeParser}
@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.test.TestSQLContext.implicits._
import org.apache.spark.sql.types.DecimalType
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf, SaveMode}
// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
@ -330,6 +330,42 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}
}
test("SPARK-6315 regression test") {
// Spark 1.1 and prior versions write Spark schema as case class string into Parquet metadata.
// This has been deprecated by JSON format since 1.2. Notice that, 1.3 further refactored data
// types API, and made StructType.fields an array. This makes the result of StructType.toString
// different from prior versions: there's no "Seq" wrapping the fields part in the string now.
val sparkSchema =
"StructType(Seq(StructField(a,BooleanType,false),StructField(b,IntegerType,false)))"
// The Parquet schema is intentionally made different from the Spark schema. Because the new
// Parquet data source simply falls back to the Parquet schema once it fails to parse the Spark
// schema. By making these two different, we are able to assert the old style case class string
// is parsed successfully.
val parquetSchema = MessageTypeParser.parseMessageType(
"""message root {
| required int32 c;
|}
""".stripMargin)
withTempPath { location =>
val extraMetadata = Map(RowReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString)
val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark")
val path = new Path(location.getCanonicalPath)
ParquetFileWriter.writeMetadataFile(
sparkContext.hadoopConfiguration,
path,
new Footer(path, new ParquetMetadata(fileMetadata, Nil)) :: Nil)
assertResult(parquetFile(path.toString).schema) {
StructType(
StructField("a", BooleanType, nullable = false) ::
StructField("b", IntegerType, nullable = false) ::
Nil)
}
}
}
}
class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {