[SPARK-35612][SQL] Support LZ4 compression in ORC data source

### What changes were proposed in this pull request?

This PR aims to support LZ4 compression in the ORC data source.

### Why are the changes needed?

Apache ORC supports LZ4 compression, but we cannot set LZ4 compression in the ORC data source

**BEFORE**

```scala
scala> spark.range(10).write.option("compression", "lz4").orc("/tmp/lz4")
java.lang.IllegalArgumentException: Codec [lz4] is not available. Available codecs are uncompressed, lzo, snappy, zlib, none, zstd.
```

**AFTER**

```scala
scala> spark.range(10).write.option("compression", "lz4").orc("/tmp/lz4")
```
```bash
$ orc-tools meta /tmp/lz4
Processing data file file:/tmp/lz4/part-00000-6a244eee-b092-4c79-a977-fb8a69dde2eb-c000.lz4.orc [length: 222]
Structure for file:/tmp/lz4/part-00000-6a244eee-b092-4c79-a977-fb8a69dde2eb-c000.lz4.orc
File Version: 0.12 with ORC_517
Rows: 10
Compression: LZ4
Compression size: 262144
Type: struct<id:bigint>

Stripe Statistics:
  Stripe 1:
    Column 0: count: 10 hasNull: false
    Column 1: count: 10 hasNull: false bytesOnDisk: 7 min: 0 max: 9 sum: 45

File Statistics:
  Column 0: count: 10 hasNull: false
  Column 1: count: 10 hasNull: false bytesOnDisk: 7 min: 0 max: 9 sum: 45

Stripes:
  Stripe: offset: 3 data: 7 rows: 10 tail: 35 index: 35
    Stream: column 0 section ROW_INDEX start: 3 length 11
    Stream: column 1 section ROW_INDEX start: 14 length 24
    Stream: column 1 section DATA start: 38 length 7
    Encoding column 0: DIRECT
    Encoding column 1: DIRECT_V2

File length: 222 bytes
Padding length: 0 bytes
Padding ratio: 0%

User Metadata:
  org.apache.spark.version=3.2.0
```

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

Pass the newly added test case.

Closes #32751 from fornaix/spark-35612.

Authored-by: fornaix <foxnaix@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
fornaix 2021-06-03 14:07:26 -07:00 committed by Dongjoon Hyun
parent 0342dcb628
commit 878527d9fa
6 changed files with 17 additions and 4 deletions

View file

@ -194,7 +194,7 @@ Data source options of ORC can be set via:
<tr>
<td><code>compression</code></td>
<td>None</td>
<td>compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, snappy, zlib, lzo, and zstd). This will override <code>orc.compress</code> and <code>spark.sql.orc.compression.codec</code>. If None is set, it uses the value specified in <code>spark.sql.orc.compression.codec</code>.</td>
<td>compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, snappy, zlib, lzo, zstd and lz4). This will override <code>orc.compress</code> and <code>spark.sql.orc.compression.codec</code>. If None is set, it uses the value specified in <code>spark.sql.orc.compression.codec</code>.</td>
<td>write</td>
</tr>
</table>

View file

@ -842,11 +842,11 @@ object SQLConf {
.doc("Sets the compression codec used when writing ORC files. If either `compression` or " +
"`orc.compress` is specified in the table-specific options/properties, the precedence " +
"would be `compression`, `orc.compress`, `spark.sql.orc.compression.codec`." +
"Acceptable values include: none, uncompressed, snappy, zlib, lzo, zstd.")
"Acceptable values include: none, uncompressed, snappy, zlib, lzo, zstd, lz4.")
.version("2.3.0")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo", "zstd"))
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo", "zstd", "lz4"))
.createWithDefault("snappy")
val ORC_IMPLEMENTATION = buildConf("spark.sql.orc.impl")

View file

@ -78,6 +78,7 @@ object OrcOptions {
"snappy" -> "SNAPPY",
"zlib" -> "ZLIB",
"lzo" -> "LZO",
"lz4" -> "LZ4",
"zstd" -> "ZSTD")
def getORCCompressionCodecName(name: String): String = shortOrcCompressionCodecNames(name)

View file

@ -46,6 +46,7 @@ object OrcUtils extends Logging {
"SNAPPY" -> ".snappy",
"ZLIB" -> ".zlib",
"ZSTD" -> ".zstd",
"LZ4" -> ".lz4",
"LZO" -> ".lzo")
def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {

View file

@ -338,7 +338,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll with CommonFileDa
}
// Test all the valid options of spark.sql.orc.compression.codec
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO", "ZSTD").foreach { c =>
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO", "ZSTD", "LZ4").foreach { c =>
withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) {
val expected = if (c == "UNCOMPRESSED") "NONE" else c
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected)
@ -540,6 +540,16 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll with CommonFileDa
}
}
}
test("SPARK-35612: Support LZ4 compression in ORC data source") {
withTempPath { dir =>
val path = dir.getAbsolutePath
spark.range(3).write.option("compression", "lz4").orc(path)
checkAnswer(spark.read.orc(path), Seq(Row(0), Row(1), Row(2)))
val files = OrcUtils.listOrcFiles(path, spark.sessionState.newHadoopConf())
assert(files.nonEmpty && files.forall(_.getName.contains("lz4")))
}
}
}
class OrcSourceSuite extends OrcSuite with SharedSparkSession {

View file

@ -314,6 +314,7 @@ private[orc] object OrcFileFormat extends HiveInspectors with Logging {
"NONE" -> "",
"SNAPPY" -> ".snappy",
"ZLIB" -> ".zlib",
"LZ4" -> ".lz4",
"LZO" -> ".lzo")
def unwrapOrcStructs(