[SPARK-33978][SQL] Support ZSTD compression in ORC data source

### What changes were proposed in this pull request?

This PR aims to support ZSTD compression in ORC data source.

### Why are the changes needed?

Apache ORC 1.6 supports ZSTD compression to generate more compact files and save the storage cost.
- https://issues.apache.org/jira/browse/ORC-363

**BEFORE**
```scala
scala> spark.range(10).write.option("compression", "zstd").orc("/tmp/zstd")
java.lang.IllegalArgumentException: Codec [zstd] is not available. Available codecs are uncompressed, lzo, snappy, zlib, none.
```

**AFTER**
```scala
scala> spark.range(10).write.option("compression", "zstd").orc("/tmp/zstd")
```

```bash
$ orc-tools meta /tmp/zstd
Processing data file file:/tmp/zstd/part-00011-a63d9a17-456f-42d3-87a1-d922112ed28c-c000.orc [length: 230]
Structure for file:/tmp/zstd/part-00011-a63d9a17-456f-42d3-87a1-d922112ed28c-c000.orc
File Version: 0.12 with ORC_14
Rows: 1
Compression: ZSTD
Compression size: 262144
Calendar: Julian/Gregorian
Type: struct<id:bigint>

Stripe Statistics:
  Stripe 1:
    Column 0: count: 1 hasNull: false
    Column 1: count: 1 hasNull: false bytesOnDisk: 6 min: 9 max: 9 sum: 9

File Statistics:
  Column 0: count: 1 hasNull: false
  Column 1: count: 1 hasNull: false bytesOnDisk: 6 min: 9 max: 9 sum: 9

Stripes:
  Stripe: offset: 3 data: 6 rows: 1 tail: 35 index: 35
    Stream: column 0 section ROW_INDEX start: 3 length 11
    Stream: column 1 section ROW_INDEX start: 14 length 24
    Stream: column 1 section DATA start: 38 length 6
    Encoding column 0: DIRECT
    Encoding column 1: DIRECT_V2

File length: 230 bytes
Padding length: 0 bytes
Padding ratio: 0%

User Metadata:
  org.apache.spark.version=3.2.0
```

### Does this PR introduce _any_ user-facing change?

Yes, this is a new feature.

### How was this patch tested?

Pass the newly added test case.

Closes #31002 from dongjoon-hyun/SPARK-33978.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Dongjoon Hyun 2021-01-04 00:54:47 -08:00
parent 8b3fb43f40
commit 271c4f6e00
5 changed files with 15 additions and 6 deletions

View file

@ -1391,7 +1391,7 @@ class DataFrameWriter(OptionUtils):
names of partitioning columns
compression : str, optional
compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, snappy, zlib, and lzo).
known case-insensitive shorten names (none, snappy, zlib, lzo, and zstd).
This will override ``orc.compress`` and
``spark.sql.orc.compression.codec``. If None is set, it uses the value
specified in ``spark.sql.orc.compression.codec``.

View file

@ -796,11 +796,11 @@ object SQLConf {
.doc("Sets the compression codec used when writing ORC files. If either `compression` or " +
"`orc.compress` is specified in the table-specific options/properties, the precedence " +
"would be `compression`, `orc.compress`, `spark.sql.orc.compression.codec`." +
"Acceptable values include: none, uncompressed, snappy, zlib, lzo.")
"Acceptable values include: none, uncompressed, snappy, zlib, lzo, zstd.")
.version("2.3.0")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo", "zstd"))
.createWithDefault("snappy")
val ORC_IMPLEMENTATION = buildConf("spark.sql.orc.impl")

View file

@ -885,7 +885,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <ul>
* <li>`compression` (default is the value specified in `spark.sql.orc.compression.codec`):
* compression codec to use when saving to file. This can be one of the known case-insensitive
* shorten names(`none`, `snappy`, `zlib`, and `lzo`). This will override
* shorten names(`none`, `snappy`, `zlib`, `lzo`, and `zstd`). This will override
* `orc.compress` and `spark.sql.orc.compression.codec`. If `orc.compress` is given,
* it overrides `spark.sql.orc.compression.codec`.</li>
* </ul>

View file

@ -77,7 +77,8 @@ object OrcOptions {
"uncompressed" -> "NONE",
"snappy" -> "SNAPPY",
"zlib" -> "ZLIB",
"lzo" -> "LZO")
"lzo" -> "LZO",
"zstd" -> "ZSTD")
def getORCCompressionCodecName(name: String): String = shortOrcCompressionCodecNames(name)
}

View file

@ -337,7 +337,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll with CommonFileDa
}
// Test all the valid options of spark.sql.orc.compression.codec
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c =>
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO", "ZSTD").foreach { c =>
withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) {
val expected = if (c == "UNCOMPRESSED") "NONE" else c
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected)
@ -594,4 +594,12 @@ class OrcSourceSuite extends OrcSuite with SharedSparkSession {
val df = readResourceOrcFile("test-data/TestStringDictionary.testRowIndex.orc")
assert(df.where("str < 'row 001000'").count() === 1000)
}
test("SPARK-33978: Write and read a file with ZSTD compression") {
withTempPath { dir =>
val path = dir.getAbsolutePath
spark.range(3).write.option("compression", "zstd").orc(path)
checkAnswer(spark.read.orc(path), Seq(Row(0), Row(1), Row(2)))
}
}
}