[SPARK-3131][SQL] Allow user to set parquet compression codec for writing ParquetFile in SQLContext
There are 4 different compression codec available for ```ParquetOutputFormat```
in Spark SQL, it was set as a hard-coded value in ```ParquetRelation.defaultCompression```
original discuss:
https://github.com/apache/spark/pull/195#discussion-diff-11002083
i added a new config property in SQLConf to allow user to change this compression codec, and i used similar short names syntax as described in SPARK-2953 #1873 (https://github.com/apache/spark/pull/1873/files#diff-0)
btw, which codec should we use as default? it was set to GZIP (https://github.com/apache/spark/pull/195/files#diff-4), but i think maybe we should change this to SNAPPY, since SNAPPY is already the default codec for shuffling in spark-core (SPARK-2469, #1415), and parquet-mr supports Snappy codec natively (e440108de5
).
Author: chutium <teng.qiu@gmail.com>
Closes #2039 from chutium/parquet-compression and squashes the following commits:
2f44964 [chutium] [SPARK-3131][SQL] parquet compression default codec set to snappy, also in test suite
e578e21 [chutium] [SPARK-3131][SQL] compression codec config property name and default codec set to snappy
21235dc [chutium] [SPARK-3131][SQL] Allow user to set parquet compression codec for writing ParquetFile in SQLContext
This commit is contained in:
parent
b21ae5bbb9
commit
8856c3d860
|
@ -33,6 +33,7 @@ private[spark] object SQLConf {
|
|||
val DIALECT = "spark.sql.dialect"
|
||||
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
|
||||
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
|
||||
val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
|
||||
|
||||
// This is only used for the thriftserver
|
||||
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
|
||||
|
@ -78,6 +79,9 @@ trait SQLConf {
|
|||
/** When true tables cached using the in-memory columnar caching will be compressed. */
|
||||
private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "false").toBoolean
|
||||
|
||||
/** The compression codec for writing to a Parquetfile */
|
||||
private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION, "snappy")
|
||||
|
||||
/** The number of rows that will be */
|
||||
private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "1000").toInt
|
||||
|
||||
|
|
|
@ -100,8 +100,13 @@ private[sql] object ParquetRelation {
|
|||
// The compression type
|
||||
type CompressionType = parquet.hadoop.metadata.CompressionCodecName
|
||||
|
||||
// The default compression
|
||||
val defaultCompression = CompressionCodecName.GZIP
|
||||
// The parquet compression short names
|
||||
val shortParquetCompressionCodecNames = Map(
|
||||
"NONE" -> CompressionCodecName.UNCOMPRESSED,
|
||||
"UNCOMPRESSED" -> CompressionCodecName.UNCOMPRESSED,
|
||||
"SNAPPY" -> CompressionCodecName.SNAPPY,
|
||||
"GZIP" -> CompressionCodecName.GZIP,
|
||||
"LZO" -> CompressionCodecName.LZO)
|
||||
|
||||
/**
|
||||
* Creates a new ParquetRelation and underlying Parquetfile for the given LogicalPlan. Note that
|
||||
|
@ -141,9 +146,8 @@ private[sql] object ParquetRelation {
|
|||
conf: Configuration,
|
||||
sqlContext: SQLContext): ParquetRelation = {
|
||||
val path = checkPath(pathString, allowExisting, conf)
|
||||
if (conf.get(ParquetOutputFormat.COMPRESSION) == null) {
|
||||
conf.set(ParquetOutputFormat.COMPRESSION, ParquetRelation.defaultCompression.name())
|
||||
}
|
||||
conf.set(ParquetOutputFormat.COMPRESSION, shortParquetCompressionCodecNames.getOrElse(
|
||||
sqlContext.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED).name())
|
||||
ParquetRelation.enableLogForwarding()
|
||||
ParquetTypesConverter.writeMetaData(attributes, path, conf)
|
||||
new ParquetRelation(path.toString, Some(conf), sqlContext) {
|
||||
|
|
|
@ -186,6 +186,100 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
|
|||
TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, oldIsParquetBinaryAsString.toString)
|
||||
}
|
||||
|
||||
test("Compression options for writing to a Parquetfile") {
|
||||
val defaultParquetCompressionCodec = TestSQLContext.parquetCompressionCodec
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
val file = getTempFilePath("parquet")
|
||||
val path = file.toString
|
||||
val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
|
||||
.map(i => TestRDDEntry(i, s"val_$i"))
|
||||
|
||||
// test default compression codec
|
||||
rdd.saveAsParquetFile(path)
|
||||
var actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
|
||||
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
|
||||
assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
|
||||
|
||||
parquetFile(path).registerTempTable("tmp")
|
||||
checkAnswer(
|
||||
sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
|
||||
(5, "val_5") ::
|
||||
(7, "val_7") :: Nil)
|
||||
|
||||
Utils.deleteRecursively(file)
|
||||
|
||||
// test uncompressed parquet file with property value "UNCOMPRESSED"
|
||||
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "UNCOMPRESSED")
|
||||
|
||||
rdd.saveAsParquetFile(path)
|
||||
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
|
||||
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
|
||||
assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
|
||||
|
||||
parquetFile(path).registerTempTable("tmp")
|
||||
checkAnswer(
|
||||
sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
|
||||
(5, "val_5") ::
|
||||
(7, "val_7") :: Nil)
|
||||
|
||||
Utils.deleteRecursively(file)
|
||||
|
||||
// test uncompressed parquet file with property value "none"
|
||||
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "none")
|
||||
|
||||
rdd.saveAsParquetFile(path)
|
||||
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
|
||||
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
|
||||
assert(actualCodec === "UNCOMPRESSED" :: Nil)
|
||||
|
||||
parquetFile(path).registerTempTable("tmp")
|
||||
checkAnswer(
|
||||
sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
|
||||
(5, "val_5") ::
|
||||
(7, "val_7") :: Nil)
|
||||
|
||||
Utils.deleteRecursively(file)
|
||||
|
||||
// test gzip compression codec
|
||||
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "gzip")
|
||||
|
||||
rdd.saveAsParquetFile(path)
|
||||
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
|
||||
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
|
||||
assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
|
||||
|
||||
parquetFile(path).registerTempTable("tmp")
|
||||
checkAnswer(
|
||||
sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
|
||||
(5, "val_5") ::
|
||||
(7, "val_7") :: Nil)
|
||||
|
||||
Utils.deleteRecursively(file)
|
||||
|
||||
// test snappy compression codec
|
||||
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "snappy")
|
||||
|
||||
rdd.saveAsParquetFile(path)
|
||||
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
|
||||
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
|
||||
assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
|
||||
|
||||
parquetFile(path).registerTempTable("tmp")
|
||||
checkAnswer(
|
||||
sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
|
||||
(5, "val_5") ::
|
||||
(7, "val_7") :: Nil)
|
||||
|
||||
Utils.deleteRecursively(file)
|
||||
|
||||
// TODO: Lzo requires additional external setup steps so leave it out for now
|
||||
// ref.: https://github.com/Parquet/parquet-mr/blob/parquet-1.5.0/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java#L169
|
||||
|
||||
// Set it back.
|
||||
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, defaultParquetCompressionCodec)
|
||||
}
|
||||
|
||||
test("Read/Write All Types with non-primitive type") {
|
||||
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
|
||||
val range = (0 to 255)
|
||||
|
|
Loading…
Reference in a new issue