[SPARK-36669][SQL] Add Lz4 wrappers for Hadoop Lz4 codec

### What changes were proposed in this pull request?

This patch proposes to add a few LZ4 wrapper classes for Parquet Lz4 compression output that uses Hadoop Lz4 codec.

### Why are the changes needed?

Currently we use Hadop 3.3.1's shaded client libraries. Lz4 is a provided dependency in Hadoop Common 3.3.1 for Lz4Codec. But it isn't excluded from relocation in these libraries. So to use lz4 as Parquet codec, we will hit the exception even we include lz4 as dependency.

```
[info]   Cause: java.lang.NoClassDefFoundError: org/apache/hadoop/shaded/net/jpountz/lz4/LZ4Factory
[info]   at org.apache.hadoop.io.compress.lz4.Lz4Compressor.<init>(Lz4Compressor.java:66)
[info]   at org.apache.hadoop.io.compress.Lz4Codec.createCompressor(Lz4Codec.java:119)
[info]   at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:152)
[info]   at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:168)
```

Before the issue is fixed at Hadoop new release, we can add a few wrapper classes for Lz4 codec.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Modified test.

Closes #33940 from viirya/lz4-wrappers.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
Liang-Chi Hsieh 2021-09-09 09:31:00 -07:00
parent 0227793f9e
commit 6bcf330191
5 changed files with 128 additions and 7 deletions

View file

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.shaded.net.jpountz.lz4;
/**
* TODO(SPARK-36679): A temporary workaround for SPARK-36669. We should remove this after
* Hadoop 3.3.2 release which fixes the LZ4 relocation in shaded Hadoop client libraries.
* This does not need implement all net.jpountz.lz4.LZ4Compressor API, just the ones used
* by Hadoop Lz4Compressor.
*/
public final class LZ4Compressor {
private net.jpountz.lz4.LZ4Compressor lz4Compressor;
public LZ4Compressor(net.jpountz.lz4.LZ4Compressor lz4Compressor) {
this.lz4Compressor = lz4Compressor;
}
public void compress(java.nio.ByteBuffer src, java.nio.ByteBuffer dest) {
lz4Compressor.compress(src, dest);
}
}

View file

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.shaded.net.jpountz.lz4;
/**
* TODO(SPARK-36679): A temporary workaround for SPARK-36669. We should remove this after
* Hadoop 3.3.2 release which fixes the LZ4 relocation in shaded Hadoop client libraries.
* This does not need implement all net.jpountz.lz4.LZ4Factory API, just the ones used by
* Hadoop Lz4Compressor.
*/
public final class LZ4Factory {
private net.jpountz.lz4.LZ4Factory lz4Factory;
public LZ4Factory(net.jpountz.lz4.LZ4Factory lz4Factory) {
this.lz4Factory = lz4Factory;
}
public static LZ4Factory fastestInstance() {
return new LZ4Factory(net.jpountz.lz4.LZ4Factory.fastestInstance());
}
public LZ4Compressor highCompressor() {
return new LZ4Compressor(lz4Factory.highCompressor());
}
public LZ4Compressor fastCompressor() {
return new LZ4Compressor(lz4Factory.fastCompressor());
}
public LZ4SafeDecompressor safeDecompressor() {
return new LZ4SafeDecompressor(lz4Factory.safeDecompressor());
}
}

View file

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.shaded.net.jpountz.lz4;
/**
* TODO(SPARK-36679): A temporary workaround for SPARK-36669. We should remove this after
* Hadoop 3.3.2 release which fixes the LZ4 relocation in shaded Hadoop client libraries.
* This does not need implement all net.jpountz.lz4.LZ4SafeDecompressor API, just the ones
* used by Hadoop Lz4Decompressor.
*/
public final class LZ4SafeDecompressor {
private net.jpountz.lz4.LZ4SafeDecompressor lz4Decompressor;
public LZ4SafeDecompressor(net.jpountz.lz4.LZ4SafeDecompressor lz4Decompressor) {
this.lz4Decompressor = lz4Decompressor;
}
public void decompress(java.nio.ByteBuffer src, java.nio.ByteBuffer dest) {
lz4Decompressor.decompress(src, dest);
}
}

View file

@ -28,7 +28,7 @@ import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.{BZip2Codec, CompressionCodec, DefaultCodec}
import org.apache.hadoop.io.compress.{BZip2Codec, CompressionCodec, DefaultCodec, Lz4Codec}
import org.apache.hadoop.mapred.{FileAlreadyExistsException, FileSplit, JobConf, TextInputFormat, TextOutputFormat}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
@ -136,9 +136,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}
// Hadoop "gzip" and "zstd" codecs require native library installed for sequence files
// "snappy" and "lz4" codecs do not work due to SPARK-36669 and SPARK-36681.
Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2")).foreach {
case (codec, codecName) =>
// "snappy" codec does not work due to SPARK-36681.
Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2"), (new Lz4Codec(), "lz4"))
.foreach { case (codec, codecName) =>
runSequenceFileCodecTest(codec, codecName)
}

View file

@ -56,13 +56,12 @@ class ParquetCodecSuite extends FileSourceCodecSuite {
override def format: String = "parquet"
override val codecConfigName: String = SQLConf.PARQUET_COMPRESSION.key
// Exclude "lzo" because it is GPL-licenced so not included in Hadoop.
// TODO(SPARK-36669): "lz4" codec fails due to HADOOP-17891.
override protected def availableCodecs: Seq[String] =
if (System.getProperty("os.arch") == "aarch64") {
// Exclude "brotli" due to PARQUET-1975.
Seq("none", "uncompressed", "snappy", "gzip", "zstd")
Seq("none", "uncompressed", "snappy", "lz4", "gzip", "zstd")
} else {
Seq("none", "uncompressed", "snappy", "gzip", "brotli", "zstd")
Seq("none", "uncompressed", "snappy", "lz4", "gzip", "brotli", "zstd")
}
}