[SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
## What changes were proposed in this pull request? This updates Parquet to 1.10.0 and updates the vectorized path for buffer management changes. Parquet 1.10.0 uses ByteBufferInputStream instead of byte arrays in encoders. This allows Parquet to break allocations into smaller chunks that are better for garbage collection. ## How was this patch tested? Existing Parquet tests. Running in production at Netflix for about 3 months. Author: Ryan Blue <blue@apache.org> Closes #21070 from rdblue/SPARK-23972-update-parquet-to-1.10.0.
This commit is contained in:
parent
7e7350285d
commit
cac9b1dea1
|
@ -162,13 +162,13 @@ orc-mapreduce-1.4.3-nohive.jar
|
|||
oro-2.0.8.jar
|
||||
osgi-resource-locator-1.0.1.jar
|
||||
paranamer-2.8.jar
|
||||
parquet-column-1.8.2.jar
|
||||
parquet-common-1.8.2.jar
|
||||
parquet-encoding-1.8.2.jar
|
||||
parquet-format-2.3.1.jar
|
||||
parquet-hadoop-1.8.2.jar
|
||||
parquet-column-1.10.0.jar
|
||||
parquet-common-1.10.0.jar
|
||||
parquet-encoding-1.10.0.jar
|
||||
parquet-format-2.4.0.jar
|
||||
parquet-hadoop-1.10.0.jar
|
||||
parquet-hadoop-bundle-1.6.0.jar
|
||||
parquet-jackson-1.8.2.jar
|
||||
parquet-jackson-1.10.0.jar
|
||||
protobuf-java-2.5.0.jar
|
||||
py4j-0.10.6.jar
|
||||
pyrolite-4.13.jar
|
||||
|
|
|
@ -163,13 +163,13 @@ orc-mapreduce-1.4.3-nohive.jar
|
|||
oro-2.0.8.jar
|
||||
osgi-resource-locator-1.0.1.jar
|
||||
paranamer-2.8.jar
|
||||
parquet-column-1.8.2.jar
|
||||
parquet-common-1.8.2.jar
|
||||
parquet-encoding-1.8.2.jar
|
||||
parquet-format-2.3.1.jar
|
||||
parquet-hadoop-1.8.2.jar
|
||||
parquet-column-1.10.0.jar
|
||||
parquet-common-1.10.0.jar
|
||||
parquet-encoding-1.10.0.jar
|
||||
parquet-format-2.4.0.jar
|
||||
parquet-hadoop-1.10.0.jar
|
||||
parquet-hadoop-bundle-1.6.0.jar
|
||||
parquet-jackson-1.8.2.jar
|
||||
parquet-jackson-1.10.0.jar
|
||||
protobuf-java-2.5.0.jar
|
||||
py4j-0.10.6.jar
|
||||
pyrolite-4.13.jar
|
||||
|
|
|
@ -181,13 +181,13 @@ orc-mapreduce-1.4.3-nohive.jar
|
|||
oro-2.0.8.jar
|
||||
osgi-resource-locator-1.0.1.jar
|
||||
paranamer-2.8.jar
|
||||
parquet-column-1.8.2.jar
|
||||
parquet-common-1.8.2.jar
|
||||
parquet-encoding-1.8.2.jar
|
||||
parquet-format-2.3.1.jar
|
||||
parquet-hadoop-1.8.2.jar
|
||||
parquet-column-1.10.0.jar
|
||||
parquet-common-1.10.0.jar
|
||||
parquet-encoding-1.10.0.jar
|
||||
parquet-format-2.4.0.jar
|
||||
parquet-hadoop-1.10.0.jar
|
||||
parquet-hadoop-bundle-1.6.0.jar
|
||||
parquet-jackson-1.8.2.jar
|
||||
parquet-jackson-1.10.0.jar
|
||||
protobuf-java-2.5.0.jar
|
||||
py4j-0.10.6.jar
|
||||
pyrolite-4.13.jar
|
||||
|
|
|
@ -964,7 +964,7 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
|
|||
Sets the compression codec used when writing Parquet files. If either `compression` or
|
||||
`parquet.compression` is specified in the table-specific options/properties, the precedence would be
|
||||
`compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include:
|
||||
none, uncompressed, snappy, gzip, lzo.
|
||||
none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
|
|
8
pom.xml
8
pom.xml
|
@ -129,7 +129,7 @@
|
|||
<!-- Version used for internal directory structure -->
|
||||
<hive.version.short>1.2.1</hive.version.short>
|
||||
<derby.version>10.12.1.1</derby.version>
|
||||
<parquet.version>1.8.2</parquet.version>
|
||||
<parquet.version>1.10.0</parquet.version>
|
||||
<orc.version>1.4.3</orc.version>
|
||||
<orc.classifier>nohive</orc.classifier>
|
||||
<hive.parquet.version>1.6.0</hive.parquet.version>
|
||||
|
@ -1778,6 +1778,12 @@
|
|||
<artifactId>parquet-hadoop</artifactId>
|
||||
<version>${parquet.version}</version>
|
||||
<scope>${parquet.deps.scope}</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>commons-pool</groupId>
|
||||
<artifactId>commons-pool</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.parquet</groupId>
|
||||
|
|
|
@ -345,7 +345,7 @@ object SQLConf {
|
|||
"snappy, gzip, lzo.")
|
||||
.stringConf
|
||||
.transform(_.toLowerCase(Locale.ROOT))
|
||||
.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo"))
|
||||
.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo", "lz4", "brotli", "zstd"))
|
||||
.createWithDefault("snappy")
|
||||
|
||||
val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
|
||||
|
|
|
@ -293,7 +293,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
|
|||
return new RLEIntIterator(
|
||||
new RunLengthBitPackingHybridDecoder(
|
||||
BytesUtils.getWidthFromMaxInt(maxLevel),
|
||||
new ByteArrayInputStream(bytes.toByteArray())));
|
||||
bytes.toInputStream()));
|
||||
} catch (IOException e) {
|
||||
throw new IOException("could not read levels in page for col " + descriptor, e);
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ import java.io.IOException;
|
|||
import java.util.Arrays;
|
||||
import java.util.TimeZone;
|
||||
|
||||
import org.apache.parquet.bytes.ByteBufferInputStream;
|
||||
import org.apache.parquet.bytes.BytesInput;
|
||||
import org.apache.parquet.bytes.BytesUtils;
|
||||
import org.apache.parquet.column.ColumnDescriptor;
|
||||
import org.apache.parquet.column.Dictionary;
|
||||
|
@ -388,7 +390,8 @@ public class VectorizedColumnReader {
|
|||
* is guaranteed that num is smaller than the number of values left in the current page.
|
||||
*/
|
||||
|
||||
private void readBooleanBatch(int rowId, int num, WritableColumnVector column) {
|
||||
private void readBooleanBatch(int rowId, int num, WritableColumnVector column)
|
||||
throws IOException {
|
||||
if (column.dataType() != DataTypes.BooleanType) {
|
||||
throw constructConvertNotSupportedException(descriptor, column);
|
||||
}
|
||||
|
@ -396,7 +399,7 @@ public class VectorizedColumnReader {
|
|||
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
|
||||
}
|
||||
|
||||
private void readIntBatch(int rowId, int num, WritableColumnVector column) {
|
||||
private void readIntBatch(int rowId, int num, WritableColumnVector column) throws IOException {
|
||||
// This is where we implement support for the valid type conversions.
|
||||
// TODO: implement remaining type conversions
|
||||
if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType ||
|
||||
|
@ -414,7 +417,7 @@ public class VectorizedColumnReader {
|
|||
}
|
||||
}
|
||||
|
||||
private void readLongBatch(int rowId, int num, WritableColumnVector column) {
|
||||
private void readLongBatch(int rowId, int num, WritableColumnVector column) throws IOException {
|
||||
// This is where we implement support for the valid type conversions.
|
||||
if (column.dataType() == DataTypes.LongType ||
|
||||
DecimalType.is64BitDecimalType(column.dataType()) ||
|
||||
|
@ -434,7 +437,7 @@ public class VectorizedColumnReader {
|
|||
}
|
||||
}
|
||||
|
||||
private void readFloatBatch(int rowId, int num, WritableColumnVector column) {
|
||||
private void readFloatBatch(int rowId, int num, WritableColumnVector column) throws IOException {
|
||||
// This is where we implement support for the valid type conversions.
|
||||
// TODO: support implicit cast to double?
|
||||
if (column.dataType() == DataTypes.FloatType) {
|
||||
|
@ -445,7 +448,7 @@ public class VectorizedColumnReader {
|
|||
}
|
||||
}
|
||||
|
||||
private void readDoubleBatch(int rowId, int num, WritableColumnVector column) {
|
||||
private void readDoubleBatch(int rowId, int num, WritableColumnVector column) throws IOException {
|
||||
// This is where we implement support for the valid type conversions.
|
||||
// TODO: implement remaining type conversions
|
||||
if (column.dataType() == DataTypes.DoubleType) {
|
||||
|
@ -456,7 +459,7 @@ public class VectorizedColumnReader {
|
|||
}
|
||||
}
|
||||
|
||||
private void readBinaryBatch(int rowId, int num, WritableColumnVector column) {
|
||||
private void readBinaryBatch(int rowId, int num, WritableColumnVector column) throws IOException {
|
||||
// This is where we implement support for the valid type conversions.
|
||||
// TODO: implement remaining type conversions
|
||||
VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
|
||||
|
@ -556,7 +559,7 @@ public class VectorizedColumnReader {
|
|||
});
|
||||
}
|
||||
|
||||
private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset) throws IOException {
|
||||
private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in) throws IOException {
|
||||
this.endOfPageValueCount = valuesRead + pageValueCount;
|
||||
if (dataEncoding.usesDictionary()) {
|
||||
this.dataColumn = null;
|
||||
|
@ -581,7 +584,7 @@ public class VectorizedColumnReader {
|
|||
}
|
||||
|
||||
try {
|
||||
dataColumn.initFromPage(pageValueCount, bytes, offset);
|
||||
dataColumn.initFromPage(pageValueCount, in);
|
||||
} catch (IOException e) {
|
||||
throw new IOException("could not read page in col " + descriptor, e);
|
||||
}
|
||||
|
@ -602,12 +605,11 @@ public class VectorizedColumnReader {
|
|||
this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
|
||||
this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
|
||||
try {
|
||||
byte[] bytes = page.getBytes().toByteArray();
|
||||
rlReader.initFromPage(pageValueCount, bytes, 0);
|
||||
int next = rlReader.getNextOffset();
|
||||
dlReader.initFromPage(pageValueCount, bytes, next);
|
||||
next = dlReader.getNextOffset();
|
||||
initDataReader(page.getValueEncoding(), bytes, next);
|
||||
BytesInput bytes = page.getBytes();
|
||||
ByteBufferInputStream in = bytes.toInputStream();
|
||||
rlReader.initFromPage(pageValueCount, in);
|
||||
dlReader.initFromPage(pageValueCount, in);
|
||||
initDataReader(page.getValueEncoding(), in);
|
||||
} catch (IOException e) {
|
||||
throw new IOException("could not read page " + page + " in col " + descriptor, e);
|
||||
}
|
||||
|
@ -619,12 +621,13 @@ public class VectorizedColumnReader {
|
|||
page.getRepetitionLevels(), descriptor);
|
||||
|
||||
int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
|
||||
this.defColumn = new VectorizedRleValuesReader(bitWidth);
|
||||
// do not read the length from the stream. v2 pages handle dividing the page bytes.
|
||||
this.defColumn = new VectorizedRleValuesReader(bitWidth, false);
|
||||
this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
|
||||
this.defColumn.initFromBuffer(
|
||||
this.pageValueCount, page.getDefinitionLevels().toByteArray());
|
||||
this.defColumn.initFromPage(
|
||||
this.pageValueCount, page.getDefinitionLevels().toInputStream());
|
||||
try {
|
||||
initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0);
|
||||
initDataReader(page.getDataEncoding(), page.getData().toInputStream());
|
||||
} catch (IOException e) {
|
||||
throw new IOException("could not read page " + page + " in col " + descriptor, e);
|
||||
}
|
||||
|
|
|
@ -20,34 +20,30 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
|
||||
import org.apache.parquet.bytes.ByteBufferInputStream;
|
||||
import org.apache.parquet.io.ParquetDecodingException;
|
||||
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
|
||||
import org.apache.spark.unsafe.Platform;
|
||||
|
||||
import org.apache.parquet.column.values.ValuesReader;
|
||||
import org.apache.parquet.io.api.Binary;
|
||||
import org.apache.spark.unsafe.Platform;
|
||||
|
||||
/**
|
||||
* An implementation of the Parquet PLAIN decoder that supports the vectorized interface.
|
||||
*/
|
||||
public class VectorizedPlainValuesReader extends ValuesReader implements VectorizedValuesReader {
|
||||
private byte[] buffer;
|
||||
private int offset;
|
||||
private int bitOffset; // Only used for booleans.
|
||||
private ByteBuffer byteBuffer; // used to wrap the byte array buffer
|
||||
private ByteBufferInputStream in = null;
|
||||
|
||||
private static final boolean bigEndianPlatform =
|
||||
ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
|
||||
// Only used for booleans.
|
||||
private int bitOffset;
|
||||
private byte currentByte = 0;
|
||||
|
||||
public VectorizedPlainValuesReader() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initFromPage(int valueCount, byte[] bytes, int offset) throws IOException {
|
||||
this.buffer = bytes;
|
||||
this.offset = offset + Platform.BYTE_ARRAY_OFFSET;
|
||||
if (bigEndianPlatform) {
|
||||
byteBuffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
|
||||
}
|
||||
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
|
||||
this.in = in;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -63,115 +59,157 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
|
|||
}
|
||||
}
|
||||
|
||||
private ByteBuffer getBuffer(int length) {
|
||||
try {
|
||||
return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
|
||||
} catch (IOException e) {
|
||||
throw new ParquetDecodingException("Failed to read " + length + " bytes", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void readIntegers(int total, WritableColumnVector c, int rowId) {
|
||||
c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
|
||||
offset += 4 * total;
|
||||
int requiredBytes = total * 4;
|
||||
ByteBuffer buffer = getBuffer(requiredBytes);
|
||||
|
||||
if (buffer.hasArray()) {
|
||||
int offset = buffer.arrayOffset() + buffer.position();
|
||||
c.putIntsLittleEndian(rowId, total, buffer.array(), offset);
|
||||
} else {
|
||||
for (int i = 0; i < total; i += 1) {
|
||||
c.putInt(rowId + i, buffer.getInt());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void readLongs(int total, WritableColumnVector c, int rowId) {
|
||||
c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
|
||||
offset += 8 * total;
|
||||
int requiredBytes = total * 8;
|
||||
ByteBuffer buffer = getBuffer(requiredBytes);
|
||||
|
||||
if (buffer.hasArray()) {
|
||||
int offset = buffer.arrayOffset() + buffer.position();
|
||||
c.putLongsLittleEndian(rowId, total, buffer.array(), offset);
|
||||
} else {
|
||||
for (int i = 0; i < total; i += 1) {
|
||||
c.putLong(rowId + i, buffer.getLong());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void readFloats(int total, WritableColumnVector c, int rowId) {
|
||||
c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
|
||||
offset += 4 * total;
|
||||
int requiredBytes = total * 4;
|
||||
ByteBuffer buffer = getBuffer(requiredBytes);
|
||||
|
||||
if (buffer.hasArray()) {
|
||||
int offset = buffer.arrayOffset() + buffer.position();
|
||||
c.putFloats(rowId, total, buffer.array(), offset);
|
||||
} else {
|
||||
for (int i = 0; i < total; i += 1) {
|
||||
c.putFloat(rowId + i, buffer.getFloat());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void readDoubles(int total, WritableColumnVector c, int rowId) {
|
||||
c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
|
||||
offset += 8 * total;
|
||||
int requiredBytes = total * 8;
|
||||
ByteBuffer buffer = getBuffer(requiredBytes);
|
||||
|
||||
if (buffer.hasArray()) {
|
||||
int offset = buffer.arrayOffset() + buffer.position();
|
||||
c.putDoubles(rowId, total, buffer.array(), offset);
|
||||
} else {
|
||||
for (int i = 0; i < total; i += 1) {
|
||||
c.putDouble(rowId + i, buffer.getDouble());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void readBytes(int total, WritableColumnVector c, int rowId) {
|
||||
for (int i = 0; i < total; i++) {
|
||||
// Bytes are stored as a 4-byte little endian int. Just read the first byte.
|
||||
// TODO: consider pushing this in ColumnVector by adding a readBytes with a stride.
|
||||
c.putByte(rowId + i, Platform.getByte(buffer, offset));
|
||||
offset += 4;
|
||||
// Bytes are stored as a 4-byte little endian int. Just read the first byte.
|
||||
// TODO: consider pushing this in ColumnVector by adding a readBytes with a stride.
|
||||
int requiredBytes = total * 4;
|
||||
ByteBuffer buffer = getBuffer(requiredBytes);
|
||||
|
||||
for (int i = 0; i < total; i += 1) {
|
||||
c.putByte(rowId + i, buffer.get());
|
||||
// skip the next 3 bytes
|
||||
buffer.position(buffer.position() + 3);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean readBoolean() {
|
||||
byte b = Platform.getByte(buffer, offset);
|
||||
boolean v = (b & (1 << bitOffset)) != 0;
|
||||
// TODO: vectorize decoding and keep boolean[] instead of currentByte
|
||||
if (bitOffset == 0) {
|
||||
try {
|
||||
currentByte = (byte) in.read();
|
||||
} catch (IOException e) {
|
||||
throw new ParquetDecodingException("Failed to read a byte", e);
|
||||
}
|
||||
}
|
||||
|
||||
boolean v = (currentByte & (1 << bitOffset)) != 0;
|
||||
bitOffset += 1;
|
||||
if (bitOffset == 8) {
|
||||
bitOffset = 0;
|
||||
offset++;
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int readInteger() {
|
||||
int v = Platform.getInt(buffer, offset);
|
||||
if (bigEndianPlatform) {
|
||||
v = java.lang.Integer.reverseBytes(v);
|
||||
}
|
||||
offset += 4;
|
||||
return v;
|
||||
return getBuffer(4).getInt();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final long readLong() {
|
||||
long v = Platform.getLong(buffer, offset);
|
||||
if (bigEndianPlatform) {
|
||||
v = java.lang.Long.reverseBytes(v);
|
||||
}
|
||||
offset += 8;
|
||||
return v;
|
||||
return getBuffer(8).getLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final byte readByte() {
|
||||
return (byte)readInteger();
|
||||
return (byte) readInteger();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final float readFloat() {
|
||||
float v;
|
||||
if (!bigEndianPlatform) {
|
||||
v = Platform.getFloat(buffer, offset);
|
||||
} else {
|
||||
v = byteBuffer.getFloat(offset - Platform.BYTE_ARRAY_OFFSET);
|
||||
}
|
||||
offset += 4;
|
||||
return v;
|
||||
return getBuffer(4).getFloat();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final double readDouble() {
|
||||
double v;
|
||||
if (!bigEndianPlatform) {
|
||||
v = Platform.getDouble(buffer, offset);
|
||||
} else {
|
||||
v = byteBuffer.getDouble(offset - Platform.BYTE_ARRAY_OFFSET);
|
||||
}
|
||||
offset += 8;
|
||||
return v;
|
||||
return getBuffer(8).getDouble();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void readBinary(int total, WritableColumnVector v, int rowId) {
|
||||
for (int i = 0; i < total; i++) {
|
||||
int len = readInteger();
|
||||
int start = offset;
|
||||
offset += len;
|
||||
v.putByteArray(rowId + i, buffer, start - Platform.BYTE_ARRAY_OFFSET, len);
|
||||
ByteBuffer buffer = getBuffer(len);
|
||||
if (buffer.hasArray()) {
|
||||
v.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() + buffer.position(), len);
|
||||
} else {
|
||||
byte[] bytes = new byte[len];
|
||||
buffer.get(bytes);
|
||||
v.putByteArray(rowId + i, bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Binary readBinary(int len) {
|
||||
Binary result = Binary.fromConstantByteArray(buffer, offset - Platform.BYTE_ARRAY_OFFSET, len);
|
||||
offset += len;
|
||||
return result;
|
||||
ByteBuffer buffer = getBuffer(len);
|
||||
if (buffer.hasArray()) {
|
||||
return Binary.fromConstantByteArray(
|
||||
buffer.array(), buffer.arrayOffset() + buffer.position(), len);
|
||||
} else {
|
||||
byte[] bytes = new byte[len];
|
||||
buffer.get(bytes);
|
||||
return Binary.fromConstantByteArray(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.spark.sql.execution.datasources.parquet;
|
||||
|
||||
import org.apache.parquet.Preconditions;
|
||||
import org.apache.parquet.bytes.ByteBufferInputStream;
|
||||
import org.apache.parquet.bytes.BytesUtils;
|
||||
import org.apache.parquet.column.values.ValuesReader;
|
||||
import org.apache.parquet.column.values.bitpacking.BytePacker;
|
||||
|
@ -27,6 +28,9 @@ import org.apache.parquet.io.api.Binary;
|
|||
|
||||
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* A values reader for Parquet's run-length encoded data. This is based off of the version in
|
||||
* parquet-mr with these changes:
|
||||
|
@ -49,9 +53,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
|
|||
}
|
||||
|
||||
// Encoded data.
|
||||
private byte[] in;
|
||||
private int end;
|
||||
private int offset;
|
||||
private ByteBufferInputStream in;
|
||||
|
||||
// bit/byte width of decoded data and utility to batch unpack them.
|
||||
private int bitWidth;
|
||||
|
@ -70,28 +72,39 @@ public final class VectorizedRleValuesReader extends ValuesReader
|
|||
// If true, the bit width is fixed. This decoder is used in different places and this also
|
||||
// controls if we need to read the bitwidth from the beginning of the data stream.
|
||||
private final boolean fixedWidth;
|
||||
private final boolean readLength;
|
||||
|
||||
public VectorizedRleValuesReader() {
|
||||
fixedWidth = false;
|
||||
this.fixedWidth = false;
|
||||
this.readLength = false;
|
||||
}
|
||||
|
||||
public VectorizedRleValuesReader(int bitWidth) {
|
||||
fixedWidth = true;
|
||||
this.fixedWidth = true;
|
||||
this.readLength = bitWidth != 0;
|
||||
init(bitWidth);
|
||||
}
|
||||
|
||||
public VectorizedRleValuesReader(int bitWidth, boolean readLength) {
|
||||
this.fixedWidth = true;
|
||||
this.readLength = readLength;
|
||||
init(bitWidth);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initFromPage(int valueCount, byte[] page, int start) {
|
||||
this.offset = start;
|
||||
this.in = page;
|
||||
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
|
||||
this.in = in;
|
||||
if (fixedWidth) {
|
||||
if (bitWidth != 0) {
|
||||
// initialize for repetition and definition levels
|
||||
if (readLength) {
|
||||
int length = readIntLittleEndian();
|
||||
this.end = this.offset + length;
|
||||
this.in = in.sliceStream(length);
|
||||
}
|
||||
} else {
|
||||
this.end = page.length;
|
||||
if (this.end != this.offset) init(page[this.offset++] & 255);
|
||||
// initialize for values
|
||||
if (in.available() > 0) {
|
||||
init(in.read());
|
||||
}
|
||||
}
|
||||
if (bitWidth == 0) {
|
||||
// 0 bit width, treat this as an RLE run of valueCount number of 0's.
|
||||
|
@ -103,22 +116,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
|
|||
}
|
||||
}
|
||||
|
||||
// Initialize the reader from a buffer. This is used for the V2 page encoding where the
|
||||
// definition are in its own buffer.
|
||||
public void initFromBuffer(int valueCount, byte[] data) {
|
||||
this.offset = 0;
|
||||
this.in = data;
|
||||
this.end = data.length;
|
||||
if (bitWidth == 0) {
|
||||
// 0 bit width, treat this as an RLE run of valueCount number of 0's.
|
||||
this.mode = MODE.RLE;
|
||||
this.currentCount = valueCount;
|
||||
this.currentValue = 0;
|
||||
} else {
|
||||
this.currentCount = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the internal state for decoding ints of `bitWidth`.
|
||||
*/
|
||||
|
@ -129,11 +126,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
|
|||
this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNextOffset() {
|
||||
return this.end;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readBoolean() {
|
||||
return this.readInteger() != 0;
|
||||
|
@ -182,7 +174,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
|
|||
WritableColumnVector c,
|
||||
int rowId,
|
||||
int level,
|
||||
VectorizedValuesReader data) {
|
||||
VectorizedValuesReader data) throws IOException {
|
||||
int left = total;
|
||||
while (left > 0) {
|
||||
if (this.currentCount == 0) this.readNextGroup();
|
||||
|
@ -217,7 +209,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
|
|||
WritableColumnVector c,
|
||||
int rowId,
|
||||
int level,
|
||||
VectorizedValuesReader data) {
|
||||
VectorizedValuesReader data) throws IOException {
|
||||
int left = total;
|
||||
while (left > 0) {
|
||||
if (this.currentCount == 0) this.readNextGroup();
|
||||
|
@ -251,7 +243,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
|
|||
WritableColumnVector c,
|
||||
int rowId,
|
||||
int level,
|
||||
VectorizedValuesReader data) {
|
||||
VectorizedValuesReader data) throws IOException {
|
||||
int left = total;
|
||||
while (left > 0) {
|
||||
if (this.currentCount == 0) this.readNextGroup();
|
||||
|
@ -285,7 +277,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
|
|||
WritableColumnVector c,
|
||||
int rowId,
|
||||
int level,
|
||||
VectorizedValuesReader data) {
|
||||
VectorizedValuesReader data) throws IOException {
|
||||
int left = total;
|
||||
while (left > 0) {
|
||||
if (this.currentCount == 0) this.readNextGroup();
|
||||
|
@ -321,7 +313,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
|
|||
WritableColumnVector c,
|
||||
int rowId,
|
||||
int level,
|
||||
VectorizedValuesReader data) {
|
||||
VectorizedValuesReader data) throws IOException {
|
||||
int left = total;
|
||||
while (left > 0) {
|
||||
if (this.currentCount == 0) this.readNextGroup();
|
||||
|
@ -355,7 +347,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
|
|||
WritableColumnVector c,
|
||||
int rowId,
|
||||
int level,
|
||||
VectorizedValuesReader data) {
|
||||
VectorizedValuesReader data) throws IOException {
|
||||
int left = total;
|
||||
while (left > 0) {
|
||||
if (this.currentCount == 0) this.readNextGroup();
|
||||
|
@ -389,7 +381,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
|
|||
WritableColumnVector c,
|
||||
int rowId,
|
||||
int level,
|
||||
VectorizedValuesReader data) {
|
||||
VectorizedValuesReader data) throws IOException {
|
||||
int left = total;
|
||||
while (left > 0) {
|
||||
if (this.currentCount == 0) this.readNextGroup();
|
||||
|
@ -423,7 +415,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
|
|||
WritableColumnVector c,
|
||||
int rowId,
|
||||
int level,
|
||||
VectorizedValuesReader data) {
|
||||
VectorizedValuesReader data) throws IOException {
|
||||
int left = total;
|
||||
while (left > 0) {
|
||||
if (this.currentCount == 0) this.readNextGroup();
|
||||
|
@ -462,7 +454,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
|
|||
WritableColumnVector nulls,
|
||||
int rowId,
|
||||
int level,
|
||||
VectorizedValuesReader data) {
|
||||
VectorizedValuesReader data) throws IOException {
|
||||
int left = total;
|
||||
while (left > 0) {
|
||||
if (this.currentCount == 0) this.readNextGroup();
|
||||
|
@ -559,12 +551,12 @@ public final class VectorizedRleValuesReader extends ValuesReader
|
|||
/**
|
||||
* Reads the next varint encoded int.
|
||||
*/
|
||||
private int readUnsignedVarInt() {
|
||||
private int readUnsignedVarInt() throws IOException {
|
||||
int value = 0;
|
||||
int shift = 0;
|
||||
int b;
|
||||
do {
|
||||
b = in[offset++] & 255;
|
||||
b = in.read();
|
||||
value |= (b & 0x7F) << shift;
|
||||
shift += 7;
|
||||
} while ((b & 0x80) != 0);
|
||||
|
@ -574,35 +566,32 @@ public final class VectorizedRleValuesReader extends ValuesReader
|
|||
/**
|
||||
* Reads the next 4 byte little endian int.
|
||||
*/
|
||||
private int readIntLittleEndian() {
|
||||
int ch4 = in[offset] & 255;
|
||||
int ch3 = in[offset + 1] & 255;
|
||||
int ch2 = in[offset + 2] & 255;
|
||||
int ch1 = in[offset + 3] & 255;
|
||||
offset += 4;
|
||||
private int readIntLittleEndian() throws IOException {
|
||||
int ch4 = in.read();
|
||||
int ch3 = in.read();
|
||||
int ch2 = in.read();
|
||||
int ch1 = in.read();
|
||||
return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the next byteWidth little endian int.
|
||||
*/
|
||||
private int readIntLittleEndianPaddedOnBitWidth() {
|
||||
private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
|
||||
switch (bytesWidth) {
|
||||
case 0:
|
||||
return 0;
|
||||
case 1:
|
||||
return in[offset++] & 255;
|
||||
return in.read();
|
||||
case 2: {
|
||||
int ch2 = in[offset] & 255;
|
||||
int ch1 = in[offset + 1] & 255;
|
||||
offset += 2;
|
||||
int ch2 = in.read();
|
||||
int ch1 = in.read();
|
||||
return (ch1 << 8) + ch2;
|
||||
}
|
||||
case 3: {
|
||||
int ch3 = in[offset] & 255;
|
||||
int ch2 = in[offset + 1] & 255;
|
||||
int ch1 = in[offset + 2] & 255;
|
||||
offset += 3;
|
||||
int ch3 = in.read();
|
||||
int ch2 = in.read();
|
||||
int ch1 = in.read();
|
||||
return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
|
||||
}
|
||||
case 4: {
|
||||
|
@ -619,32 +608,36 @@ public final class VectorizedRleValuesReader extends ValuesReader
|
|||
/**
|
||||
* Reads the next group.
|
||||
*/
|
||||
private void readNextGroup() {
|
||||
int header = readUnsignedVarInt();
|
||||
this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
|
||||
switch (mode) {
|
||||
case RLE:
|
||||
this.currentCount = header >>> 1;
|
||||
this.currentValue = readIntLittleEndianPaddedOnBitWidth();
|
||||
return;
|
||||
case PACKED:
|
||||
int numGroups = header >>> 1;
|
||||
this.currentCount = numGroups * 8;
|
||||
int bytesToRead = ceil8(this.currentCount * this.bitWidth);
|
||||
private void readNextGroup() {
|
||||
try {
|
||||
int header = readUnsignedVarInt();
|
||||
this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
|
||||
switch (mode) {
|
||||
case RLE:
|
||||
this.currentCount = header >>> 1;
|
||||
this.currentValue = readIntLittleEndianPaddedOnBitWidth();
|
||||
return;
|
||||
case PACKED:
|
||||
int numGroups = header >>> 1;
|
||||
this.currentCount = numGroups * 8;
|
||||
|
||||
if (this.currentBuffer.length < this.currentCount) {
|
||||
this.currentBuffer = new int[this.currentCount];
|
||||
}
|
||||
currentBufferIdx = 0;
|
||||
int valueIndex = 0;
|
||||
for (int byteIndex = offset; valueIndex < this.currentCount; byteIndex += this.bitWidth) {
|
||||
this.packer.unpack8Values(in, byteIndex, this.currentBuffer, valueIndex);
|
||||
valueIndex += 8;
|
||||
}
|
||||
offset += bytesToRead;
|
||||
return;
|
||||
default:
|
||||
throw new ParquetDecodingException("not a valid mode " + this.mode);
|
||||
if (this.currentBuffer.length < this.currentCount) {
|
||||
this.currentBuffer = new int[this.currentCount];
|
||||
}
|
||||
currentBufferIdx = 0;
|
||||
int valueIndex = 0;
|
||||
while (valueIndex < this.currentCount) {
|
||||
// values are bit packed 8 at a time, so reading bitWidth will always work
|
||||
ByteBuffer buffer = in.slice(bitWidth);
|
||||
this.packer.unpack8Values(buffer, buffer.position(), this.currentBuffer, valueIndex);
|
||||
valueIndex += 8;
|
||||
}
|
||||
return;
|
||||
default:
|
||||
throw new ParquetDecodingException("not a valid mode " + this.mode);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ParquetDecodingException("Failed to read from input stream", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -81,7 +81,10 @@ object ParquetOptions {
|
|||
"uncompressed" -> CompressionCodecName.UNCOMPRESSED,
|
||||
"snappy" -> CompressionCodecName.SNAPPY,
|
||||
"gzip" -> CompressionCodecName.GZIP,
|
||||
"lzo" -> CompressionCodecName.LZO)
|
||||
"lzo" -> CompressionCodecName.LZO,
|
||||
"lz4" -> CompressionCodecName.LZ4,
|
||||
"brotli" -> CompressionCodecName.BROTLI,
|
||||
"zstd" -> CompressionCodecName.ZSTD)
|
||||
|
||||
def getParquetCompressionCodecName(name: String): String = {
|
||||
shortParquetCompressionCodecNames(name).name()
|
||||
|
|
|
@ -89,7 +89,7 @@ Database default
|
|||
Table t
|
||||
Partition Values [ds=2017-08-01, hr=10]
|
||||
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10
|
||||
Partition Statistics 1067 bytes, 3 rows
|
||||
Partition Statistics 1121 bytes, 3 rows
|
||||
|
||||
# Storage Information
|
||||
Location [not included in comparison]sql/core/spark-warehouse/t
|
||||
|
@ -122,7 +122,7 @@ Database default
|
|||
Table t
|
||||
Partition Values [ds=2017-08-01, hr=10]
|
||||
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10
|
||||
Partition Statistics 1067 bytes, 3 rows
|
||||
Partition Statistics 1121 bytes, 3 rows
|
||||
|
||||
# Storage Information
|
||||
Location [not included in comparison]sql/core/spark-warehouse/t
|
||||
|
@ -147,7 +147,7 @@ Database default
|
|||
Table t
|
||||
Partition Values [ds=2017-08-01, hr=11]
|
||||
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11
|
||||
Partition Statistics 1080 bytes, 4 rows
|
||||
Partition Statistics 1098 bytes, 4 rows
|
||||
|
||||
# Storage Information
|
||||
Location [not included in comparison]sql/core/spark-warehouse/t
|
||||
|
@ -180,7 +180,7 @@ Database default
|
|||
Table t
|
||||
Partition Values [ds=2017-08-01, hr=10]
|
||||
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10
|
||||
Partition Statistics 1067 bytes, 3 rows
|
||||
Partition Statistics 1121 bytes, 3 rows
|
||||
|
||||
# Storage Information
|
||||
Location [not included in comparison]sql/core/spark-warehouse/t
|
||||
|
@ -205,7 +205,7 @@ Database default
|
|||
Table t
|
||||
Partition Values [ds=2017-08-01, hr=11]
|
||||
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11
|
||||
Partition Statistics 1080 bytes, 4 rows
|
||||
Partition Statistics 1098 bytes, 4 rows
|
||||
|
||||
# Storage Information
|
||||
Location [not included in comparison]sql/core/spark-warehouse/t
|
||||
|
@ -230,7 +230,7 @@ Database default
|
|||
Table t
|
||||
Partition Values [ds=2017-09-01, hr=5]
|
||||
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-09-01/hr=5
|
||||
Partition Statistics 1054 bytes, 2 rows
|
||||
Partition Statistics 1144 bytes, 2 rows
|
||||
|
||||
# Storage Information
|
||||
Location [not included in comparison]sql/core/spark-warehouse/t
|
||||
|
|
|
@ -503,7 +503,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
|
|||
case plan: InMemoryRelation => plan
|
||||
}.head
|
||||
// InMemoryRelation's stats is file size before the underlying RDD is materialized
|
||||
assert(inMemoryRelation.computeStats().sizeInBytes === 740)
|
||||
assert(inMemoryRelation.computeStats().sizeInBytes === 800)
|
||||
|
||||
// InMemoryRelation's stats is updated after materializing RDD
|
||||
dfFromFile.collect()
|
||||
|
@ -516,7 +516,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
|
|||
|
||||
// Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats
|
||||
// is calculated
|
||||
assert(inMemoryRelation2.computeStats().sizeInBytes === 740)
|
||||
assert(inMemoryRelation2.computeStats().sizeInBytes === 800)
|
||||
|
||||
// InMemoryRelation's stats should be updated after calculating stats of the table
|
||||
// clear cache to simulate a fresh environment
|
||||
|
|
Loading…
Reference in a new issue