[SPARK-11044][SQL] Parquet writer version fixed as version1

https://issues.apache.org/jira/browse/SPARK-11044

Spark writes a parquet file only with writer version1 ignoring the writer version given by user.

So, in this PR, it keeps the writer version if given or sets version1 as default.

Author: hyukjinkwon <gurwls223@gmail.com>
Author: HyukjinKwon <gurwls223@gmail.com>

Closes #9060 from HyukjinKwon/SPARK-11044.
This commit is contained in:
hyukjinkwon 2015-11-16 21:30:10 +08:00 committed by Cheng Lian
parent 42de5253f3
commit 7f8eb3bf6e
2 changed files with 35 additions and 1 deletions

View file

@ -429,7 +429,7 @@ private[parquet] object CatalystWriteSupport {
def setSchema(schema: StructType, configuration: Configuration): Unit = {
schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
configuration.set(SPARK_ROW_SCHEMA, schema.json)
configuration.set(
configuration.setIfUnset(
ParquetOutputFormat.WRITER_VERSION,
ParquetProperties.WriterVersion.PARQUET_1_0.toString)
}

View file

@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.util.Collections
import org.apache.parquet.column.{Encoding, ParquetProperties}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
@ -534,6 +536,38 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
test("SPARK-11044 Parquet writer version fixed as version1 ") {
// For dictionary encoding, Parquet changes the encoding types according to its writer
// version. So, this test checks one of the encoding types in order to ensure that
// the file is written with writer version2.
withTempPath { dir =>
val clonedConf = new Configuration(hadoopConfiguration)
try {
// Write a Parquet file with writer version2.
hadoopConfiguration.set(ParquetOutputFormat.WRITER_VERSION,
ParquetProperties.WriterVersion.PARQUET_2_0.toString)
// By default, dictionary encoding is enabled from Parquet 1.2.0 but
// it is enabled just in case.
hadoopConfiguration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, true)
val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
sqlContext.range(1 << 16).selectExpr("(id % 4) AS i")
.coalesce(1).write.mode("overwrite").parquet(path)
val blockMetadata = readFooter(new Path(path), hadoopConfiguration).getBlocks.asScala.head
val columnChunkMetadata = blockMetadata.getColumns.asScala.head
// If the file is written with version2, this should include
// Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY
assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
} finally {
// Manually clear the hadoop configuration for other tests.
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}
}
test("read dictionary encoded decimals written as INT32") {
checkAnswer(
// Decimal column in this file is encoded using plain dictionary