From 878527d9fae8945d087ec871bb0a5f49b6341939 Mon Sep 17 00:00:00 2001 From: fornaix Date: Thu, 3 Jun 2021 14:07:26 -0700 Subject: [PATCH] [SPARK-35612][SQL] Support LZ4 compression in ORC data source ### What changes were proposed in this pull request? This PR aims to support LZ4 compression in the ORC data source. ### Why are the changes needed? Apache ORC supports LZ4 compression, but we cannot set LZ4 compression in the ORC data source **BEFORE** ```scala scala> spark.range(10).write.option("compression", "lz4").orc("/tmp/lz4") java.lang.IllegalArgumentException: Codec [lz4] is not available. Available codecs are uncompressed, lzo, snappy, zlib, none, zstd. ``` **AFTER** ```scala scala> spark.range(10).write.option("compression", "lz4").orc("/tmp/lz4") ``` ```bash $ orc-tools meta /tmp/lz4 Processing data file file:/tmp/lz4/part-00000-6a244eee-b092-4c79-a977-fb8a69dde2eb-c000.lz4.orc [length: 222] Structure for file:/tmp/lz4/part-00000-6a244eee-b092-4c79-a977-fb8a69dde2eb-c000.lz4.orc File Version: 0.12 with ORC_517 Rows: 10 Compression: LZ4 Compression size: 262144 Type: struct Stripe Statistics: Stripe 1: Column 0: count: 10 hasNull: false Column 1: count: 10 hasNull: false bytesOnDisk: 7 min: 0 max: 9 sum: 45 File Statistics: Column 0: count: 10 hasNull: false Column 1: count: 10 hasNull: false bytesOnDisk: 7 min: 0 max: 9 sum: 45 Stripes: Stripe: offset: 3 data: 7 rows: 10 tail: 35 index: 35 Stream: column 0 section ROW_INDEX start: 3 length 11 Stream: column 1 section ROW_INDEX start: 14 length 24 Stream: column 1 section DATA start: 38 length 7 Encoding column 0: DIRECT Encoding column 1: DIRECT_V2 File length: 222 bytes Padding length: 0 bytes Padding ratio: 0% User Metadata: org.apache.spark.version=3.2.0 ``` ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Pass the newly added test case. Closes #32751 from fornaix/spark-35612. Authored-by: fornaix Signed-off-by: Dongjoon Hyun --- docs/sql-data-sources-orc.md | 2 +- .../org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- .../sql/execution/datasources/orc/OrcOptions.scala | 1 + .../sql/execution/datasources/orc/OrcUtils.scala | 1 + .../execution/datasources/orc/OrcSourceSuite.scala | 12 +++++++++++- .../apache/spark/sql/hive/orc/OrcFileFormat.scala | 1 + 6 files changed, 17 insertions(+), 4 deletions(-) diff --git a/docs/sql-data-sources-orc.md b/docs/sql-data-sources-orc.md index d2cddde6bb..2b8908a875 100644 --- a/docs/sql-data-sources-orc.md +++ b/docs/sql-data-sources-orc.md @@ -194,7 +194,7 @@ Data source options of ORC can be set via: compression None - compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, snappy, zlib, lzo, and zstd). This will override orc.compress and spark.sql.orc.compression.codec. If None is set, it uses the value specified in spark.sql.orc.compression.codec. + compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, snappy, zlib, lzo, zstd and lz4). This will override orc.compress and spark.sql.orc.compression.codec. If None is set, it uses the value specified in spark.sql.orc.compression.codec. write diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e368ee1d53..1203248785 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -842,11 +842,11 @@ object SQLConf { .doc("Sets the compression codec used when writing ORC files. If either `compression` or " + "`orc.compress` is specified in the table-specific options/properties, the precedence " + "would be `compression`, `orc.compress`, `spark.sql.orc.compression.codec`." + - "Acceptable values include: none, uncompressed, snappy, zlib, lzo, zstd.") + "Acceptable values include: none, uncompressed, snappy, zlib, lzo, zstd, lz4.") .version("2.3.0") .stringConf .transform(_.toLowerCase(Locale.ROOT)) - .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo", "zstd")) + .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo", "zstd", "lz4")) .createWithDefault("snappy") val ORC_IMPLEMENTATION = buildConf("spark.sql.orc.impl") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala index af92d94d68..9416996198 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala @@ -78,6 +78,7 @@ object OrcOptions { "snappy" -> "SNAPPY", "zlib" -> "ZLIB", "lzo" -> "LZO", + "lz4" -> "LZ4", "zstd" -> "ZSTD") def getORCCompressionCodecName(name: String): String = shortOrcCompressionCodecNames(name) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index c81ac3c940..a63a5c6d50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -46,6 +46,7 @@ object OrcUtils extends Logging { "SNAPPY" -> ".snappy", "ZLIB" -> ".zlib", "ZSTD" -> ".zstd", + "LZ4" -> ".lz4", "LZO" -> ".lzo") def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index eee8e2ecc9..9acf59cbd9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -338,7 +338,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll with CommonFileDa } // Test all the valid options of spark.sql.orc.compression.codec - Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO", "ZSTD").foreach { c => + Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO", "ZSTD", "LZ4").foreach { c => withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) { val expected = if (c == "UNCOMPRESSED") "NONE" else c assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected) @@ -540,6 +540,16 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll with CommonFileDa } } } + + test("SPARK-35612: Support LZ4 compression in ORC data source") { + withTempPath { dir => + val path = dir.getAbsolutePath + spark.range(3).write.option("compression", "lz4").orc(path) + checkAnswer(spark.read.orc(path), Seq(Row(0), Row(1), Row(2))) + val files = OrcUtils.listOrcFiles(path, spark.sessionState.newHadoopConf()) + assert(files.nonEmpty && files.forall(_.getName.contains("lz4"))) + } + } } class OrcSourceSuite extends OrcSuite with SharedSparkSession { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index d2ac06ad0a..c50ecf7a93 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -314,6 +314,7 @@ private[orc] object OrcFileFormat extends HiveInspectors with Logging { "NONE" -> "", "SNAPPY" -> ".snappy", "ZLIB" -> ".zlib", + "LZ4" -> ".lz4", "LZO" -> ".lzo") def unwrapOrcStructs(