[SPARK-21276][CORE] Update lz4-java to the latest (v1.4.0)
## What changes were proposed in this pull request?
This pr updated `lz4-java` to the latest (v1.4.0) and removed custom `LZ4BlockInputStream`. We currently use custom `LZ4BlockInputStream` to read concatenated byte stream in shuffle. But, this functionality has been implemented in the latest lz4-java (https://github.com/lz4/lz4-java/pull/105). So, we might update the latest to remove the custom `LZ4BlockInputStream`.
Major diffs between the latest release and v1.3.0 in the master are as follows (62f7547abb...6d4693f562
);
- fixed NPE in XXHashFactory similarly
- Don't place resources in default package to support shading
- Fixes ByteBuffer methods failing to apply arrayOffset() for array-backed
- Try to load lz4-java from java.library.path, then fallback to bundled
- Add ppc64le binary
- Add s390x JNI binding
- Add basic LZ4 Frame v1.5.0 support
- enable aarch64 support for lz4-java
- Allow unsafeInstance() for ppc64le archiecture
- Add unsafeInstance support for AArch64
- Support 64-bit JNI build on Solaris
- Avoid over-allocating a buffer
- Allow EndMark to be incompressible for LZ4FrameInputStream.
- Concat byte stream
## How was this patch tested?
Existing tests.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes #18883 from maropu/SPARK-21276.
This commit is contained in:
parent
83fe3b5e10
commit
b78cf13bf0
|
@ -190,8 +190,8 @@
|
|||
<artifactId>snappy-java</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.jpountz.lz4</groupId>
|
||||
<artifactId>lz4</artifactId>
|
||||
<groupId>org.lz4</groupId>
|
||||
<artifactId>lz4-java</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.roaringbitmap</groupId>
|
||||
|
|
|
@ -1,260 +0,0 @@
|
|||
/*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.spark.io;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.FilterInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.zip.Checksum;
|
||||
|
||||
import net.jpountz.lz4.LZ4Exception;
|
||||
import net.jpountz.lz4.LZ4Factory;
|
||||
import net.jpountz.lz4.LZ4FastDecompressor;
|
||||
import net.jpountz.util.SafeUtils;
|
||||
import net.jpountz.xxhash.XXHashFactory;
|
||||
|
||||
/**
|
||||
* {@link InputStream} implementation to decode data written with
|
||||
* {@link net.jpountz.lz4.LZ4BlockOutputStream}. This class is not thread-safe and does not
|
||||
* support {@link #mark(int)}/{@link #reset()}.
|
||||
* @see net.jpountz.lz4.LZ4BlockOutputStream
|
||||
*
|
||||
* This is based on net.jpountz.lz4.LZ4BlockInputStream
|
||||
*
|
||||
* changes: https://github.com/davies/lz4-java/commit/cc1fa940ac57cc66a0b937300f805d37e2bf8411
|
||||
*
|
||||
* TODO: merge this into upstream
|
||||
*/
|
||||
public final class LZ4BlockInputStream extends FilterInputStream {
|
||||
|
||||
// Copied from net.jpountz.lz4.LZ4BlockOutputStream
|
||||
static final byte[] MAGIC = new byte[] { 'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k' };
|
||||
static final int MAGIC_LENGTH = MAGIC.length;
|
||||
|
||||
static final int HEADER_LENGTH =
|
||||
MAGIC_LENGTH // magic bytes
|
||||
+ 1 // token
|
||||
+ 4 // compressed length
|
||||
+ 4 // decompressed length
|
||||
+ 4; // checksum
|
||||
|
||||
static final int COMPRESSION_LEVEL_BASE = 10;
|
||||
|
||||
static final int COMPRESSION_METHOD_RAW = 0x10;
|
||||
static final int COMPRESSION_METHOD_LZ4 = 0x20;
|
||||
|
||||
static final int DEFAULT_SEED = 0x9747b28c;
|
||||
|
||||
private final LZ4FastDecompressor decompressor;
|
||||
private final Checksum checksum;
|
||||
private byte[] buffer;
|
||||
private byte[] compressedBuffer;
|
||||
private int originalLen;
|
||||
private int o;
|
||||
private boolean finished;
|
||||
|
||||
/**
|
||||
* Create a new {@link InputStream}.
|
||||
*
|
||||
* @param in the {@link InputStream} to poll
|
||||
* @param decompressor the {@link LZ4FastDecompressor decompressor} instance to
|
||||
* use
|
||||
* @param checksum the {@link Checksum} instance to use, must be
|
||||
* equivalent to the instance which has been used to
|
||||
* write the stream
|
||||
*/
|
||||
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Checksum checksum) {
|
||||
super(in);
|
||||
this.decompressor = decompressor;
|
||||
this.checksum = checksum;
|
||||
this.buffer = new byte[0];
|
||||
this.compressedBuffer = new byte[HEADER_LENGTH];
|
||||
o = originalLen = 0;
|
||||
finished = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new instance using {@link net.jpountz.xxhash.XXHash32} for checksuming.
|
||||
* @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum)
|
||||
* @see net.jpountz.xxhash.StreamingXXHash32#asChecksum()
|
||||
*/
|
||||
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor) {
|
||||
this(in, decompressor,
|
||||
XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new instance which uses the fastest {@link LZ4FastDecompressor} available.
|
||||
* @see LZ4Factory#fastestInstance()
|
||||
* @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor)
|
||||
*/
|
||||
public LZ4BlockInputStream(InputStream in) {
|
||||
this(in, LZ4Factory.fastestInstance().fastDecompressor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int available() throws IOException {
|
||||
refill();
|
||||
return originalLen - o;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
refill();
|
||||
if (finished) {
|
||||
return -1;
|
||||
}
|
||||
return buffer[o++] & 0xFF;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
SafeUtils.checkRange(b, off, len);
|
||||
refill();
|
||||
if (finished) {
|
||||
return -1;
|
||||
}
|
||||
len = Math.min(len, originalLen - o);
|
||||
System.arraycopy(buffer, o, b, off, len);
|
||||
o += len;
|
||||
return len;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b) throws IOException {
|
||||
return read(b, 0, b.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long skip(long n) throws IOException {
|
||||
refill();
|
||||
if (finished) {
|
||||
return -1;
|
||||
}
|
||||
final int skipped = (int) Math.min(n, originalLen - o);
|
||||
o += skipped;
|
||||
return skipped;
|
||||
}
|
||||
|
||||
private void refill() throws IOException {
|
||||
if (finished || o < originalLen) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
readFully(compressedBuffer, HEADER_LENGTH);
|
||||
} catch (EOFException e) {
|
||||
finished = true;
|
||||
return;
|
||||
}
|
||||
for (int i = 0; i < MAGIC_LENGTH; ++i) {
|
||||
if (compressedBuffer[i] != MAGIC[i]) {
|
||||
throw new IOException("Stream is corrupted");
|
||||
}
|
||||
}
|
||||
final int token = compressedBuffer[MAGIC_LENGTH] & 0xFF;
|
||||
final int compressionMethod = token & 0xF0;
|
||||
final int compressionLevel = COMPRESSION_LEVEL_BASE + (token & 0x0F);
|
||||
if (compressionMethod != COMPRESSION_METHOD_RAW && compressionMethod != COMPRESSION_METHOD_LZ4)
|
||||
{
|
||||
throw new IOException("Stream is corrupted");
|
||||
}
|
||||
final int compressedLen = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 1);
|
||||
originalLen = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 5);
|
||||
final int check = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 9);
|
||||
assert HEADER_LENGTH == MAGIC_LENGTH + 13;
|
||||
if (originalLen > 1 << compressionLevel
|
||||
|| originalLen < 0
|
||||
|| compressedLen < 0
|
||||
|| (originalLen == 0 && compressedLen != 0)
|
||||
|| (originalLen != 0 && compressedLen == 0)
|
||||
|| (compressionMethod == COMPRESSION_METHOD_RAW && originalLen != compressedLen)) {
|
||||
throw new IOException("Stream is corrupted");
|
||||
}
|
||||
if (originalLen == 0 && compressedLen == 0) {
|
||||
if (check != 0) {
|
||||
throw new IOException("Stream is corrupted");
|
||||
}
|
||||
refill();
|
||||
return;
|
||||
}
|
||||
if (buffer.length < originalLen) {
|
||||
buffer = new byte[Math.max(originalLen, buffer.length * 3 / 2)];
|
||||
}
|
||||
switch (compressionMethod) {
|
||||
case COMPRESSION_METHOD_RAW:
|
||||
readFully(buffer, originalLen);
|
||||
break;
|
||||
case COMPRESSION_METHOD_LZ4:
|
||||
if (compressedBuffer.length < originalLen) {
|
||||
compressedBuffer = new byte[Math.max(compressedLen, compressedBuffer.length * 3 / 2)];
|
||||
}
|
||||
readFully(compressedBuffer, compressedLen);
|
||||
try {
|
||||
final int compressedLen2 =
|
||||
decompressor.decompress(compressedBuffer, 0, buffer, 0, originalLen);
|
||||
if (compressedLen != compressedLen2) {
|
||||
throw new IOException("Stream is corrupted");
|
||||
}
|
||||
} catch (LZ4Exception e) {
|
||||
throw new IOException("Stream is corrupted", e);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError();
|
||||
}
|
||||
checksum.reset();
|
||||
checksum.update(buffer, 0, originalLen);
|
||||
if ((int) checksum.getValue() != check) {
|
||||
throw new IOException("Stream is corrupted");
|
||||
}
|
||||
o = 0;
|
||||
}
|
||||
|
||||
private void readFully(byte[] b, int len) throws IOException {
|
||||
int read = 0;
|
||||
while (read < len) {
|
||||
final int r = in.read(b, read, len - read);
|
||||
if (r < 0) {
|
||||
throw new EOFException("Stream ended prematurely");
|
||||
}
|
||||
read += r;
|
||||
}
|
||||
assert len == read;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean markSupported() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@SuppressWarnings("sync-override")
|
||||
@Override
|
||||
public void mark(int readlimit) {
|
||||
// unsupported
|
||||
}
|
||||
|
||||
@SuppressWarnings("sync-override")
|
||||
@Override
|
||||
public void reset() throws IOException {
|
||||
throw new IOException("mark/reset not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + "(in=" + in
|
||||
+ ", decompressor=" + decompressor + ", checksum=" + checksum + ")";
|
||||
}
|
||||
|
||||
}
|
|
@ -21,7 +21,7 @@ import java.io._
|
|||
import java.util.Locale
|
||||
|
||||
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
|
||||
import net.jpountz.lz4.LZ4BlockOutputStream
|
||||
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
|
||||
import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
|
||||
|
||||
import org.apache.spark.SparkConf
|
||||
|
@ -115,7 +115,10 @@ class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
|
|||
new LZ4BlockOutputStream(s, blockSize)
|
||||
}
|
||||
|
||||
override def compressedInputStream(s: InputStream): InputStream = new LZ4BlockInputStream(s)
|
||||
override def compressedInputStream(s: InputStream): InputStream = {
|
||||
val disableConcatenationOfByteStream = false
|
||||
new LZ4BlockInputStream(s, disableConcatenationOfByteStream)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -133,7 +133,7 @@ leveldbjni-all-1.8.jar
|
|||
libfb303-0.9.3.jar
|
||||
libthrift-0.9.3.jar
|
||||
log4j-1.2.17.jar
|
||||
lz4-1.3.0.jar
|
||||
lz4-java-1.4.0.jar
|
||||
machinist_2.11-0.6.1.jar
|
||||
macro-compat_2.11-1.1.1.jar
|
||||
mail-1.4.7.jar
|
||||
|
|
|
@ -134,7 +134,7 @@ leveldbjni-all-1.8.jar
|
|||
libfb303-0.9.3.jar
|
||||
libthrift-0.9.3.jar
|
||||
log4j-1.2.17.jar
|
||||
lz4-1.3.0.jar
|
||||
lz4-java-1.4.0.jar
|
||||
machinist_2.11-0.6.1.jar
|
||||
macro-compat_2.11-1.1.1.jar
|
||||
mail-1.4.7.jar
|
||||
|
|
4
external/kafka-0-10-assembly/pom.xml
vendored
4
external/kafka-0-10-assembly/pom.xml
vendored
|
@ -65,8 +65,8 @@
|
|||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.jpountz.lz4</groupId>
|
||||
<artifactId>lz4</artifactId>
|
||||
<groupId>org.lz4</groupId>
|
||||
<artifactId>lz4-java</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
|
4
external/kafka-0-8-assembly/pom.xml
vendored
4
external/kafka-0-8-assembly/pom.xml
vendored
|
@ -65,8 +65,8 @@
|
|||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.jpountz.lz4</groupId>
|
||||
<artifactId>lz4</artifactId>
|
||||
<groupId>org.lz4</groupId>
|
||||
<artifactId>lz4-java</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
|
6
pom.xml
6
pom.xml
|
@ -531,9 +531,9 @@
|
|||
<scope>${hadoop.deps.scope}</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.jpountz.lz4</groupId>
|
||||
<artifactId>lz4</artifactId>
|
||||
<version>1.3.0</version>
|
||||
<groupId>org.lz4</groupId>
|
||||
<artifactId>lz4-java</artifactId>
|
||||
<version>1.4.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.clearspring.analytics</groupId>
|
||||
|
|
|
@ -41,7 +41,10 @@ object MimaExcludes {
|
|||
|
||||
// [SPARK-19937] Add remote bytes read to disk.
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetrics.this"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this"),
|
||||
|
||||
// [SPARK-21276] Update lz4-java to the latest (v1.4.0)
|
||||
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.io.LZ4BlockInputStream")
|
||||
)
|
||||
|
||||
// Exclude rules for 2.2.x
|
||||
|
|
Loading…
Reference in a new issue