From 5b2f1912280e7a5afb92a96b894a7bc5f263aa6e Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Mon, 12 Jul 2021 22:30:21 +0800 Subject: [PATCH] [SPARK-36056][SQL] Combine readBatch and readIntegers in VectorizedRleValuesReader ### What changes were proposed in this pull request? Combine `readBatch` and `readIntegers` in `VectorizedRleValuesReader` by having them share the same `readBatchInternal` method. ### Why are the changes needed? `readBatch` and `readIntegers` share similar code path and this Jira aims to combine them into one method for easier maintenance. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests as this is just a refactoring. Closes #33271 from sunchao/SPARK-35743-read-integers. Authored-by: Chao Sun Signed-off-by: Wenchen Fan (cherry picked from commit 5edbbd1711402735623fa1fc9b86ff41c28996e9) Signed-off-by: Wenchen Fan --- .../parquet/ParquetVectorUpdaterFactory.java | 2 +- .../parquet/VectorizedRleValuesReader.java | 104 +++++------------- 2 files changed, 27 insertions(+), 79 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index 2282dc7984..39de909171 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -216,7 +216,7 @@ public class ParquetVectorUpdaterFactory { } } - private static class IntegerUpdater implements ParquetVectorUpdater { + static class IntegerUpdater implements ParquetVectorUpdater { @Override public void readValues( int total, diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index 03bda0fedb..9d88039457 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -167,7 +167,29 @@ public final class VectorizedRleValuesReader extends ValuesReader ParquetReadState state, WritableColumnVector values, VectorizedValuesReader valueReader, - ParquetVectorUpdater updater) throws IOException { + ParquetVectorUpdater updater) { + readBatchInternal(state, values, values, valueReader, updater); + } + + /** + * Decoding for dictionary ids. The IDs are populated into `values` and the nullability is + * populated into `nulls`. + */ + public void readIntegers( + ParquetReadState state, + WritableColumnVector values, + WritableColumnVector nulls, + VectorizedValuesReader data) { + readBatchInternal(state, values, nulls, data, new ParquetVectorUpdaterFactory.IntegerUpdater()); + } + + private void readBatchInternal( + ParquetReadState state, + WritableColumnVector values, + WritableColumnVector nulls, + VectorizedValuesReader valueReader, + ParquetVectorUpdater updater) { + int offset = state.offset; long rowId = state.rowId; int leftInBatch = state.valuesToReadInBatch; @@ -209,97 +231,23 @@ public final class VectorizedRleValuesReader extends ValuesReader if (currentValue == state.maxDefinitionLevel) { updater.readValues(n, offset, values, valueReader); } else { - values.putNulls(offset, n); + nulls.putNulls(offset, n); } break; case PACKED: for (int i = 0; i < n; ++i) { if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) { updater.readValue(offset + i, values, valueReader); - } else { - values.putNull(offset + i); - } - } - break; - } - offset += n; - leftInBatch -= n; - rowId += n; - leftInPage -= n; - currentCount -= n; - } - } - - state.advanceOffsetAndRowId(offset, rowId); - } - - /** - * Decoding for dictionary ids. The IDs are populated into `values` and the nullability is - * populated into `nulls`. - */ - public void readIntegers( - ParquetReadState state, - WritableColumnVector values, - WritableColumnVector nulls, - VectorizedValuesReader data) throws IOException { - int offset = state.offset; - long rowId = state.rowId; - int leftInBatch = state.valuesToReadInBatch; - int leftInPage = state.valuesToReadInPage; - - while (leftInBatch > 0 && leftInPage > 0) { - if (this.currentCount == 0) this.readNextGroup(); - int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount)); - - long rangeStart = state.currentRangeStart(); - long rangeEnd = state.currentRangeEnd(); - - if (rowId + n < rangeStart) { - data.skipIntegers(n); - advance(n); - rowId += n; - leftInPage -= n; - } else if (rowId > rangeEnd) { - state.nextRange(); - } else { - // the range [rowId, rowId + n) overlaps with the current row range in state - long start = Math.max(rangeStart, rowId); - long end = Math.min(rangeEnd, rowId + n - 1); - - // skip the part [rowId, start) - int toSkip = (int) (start - rowId); - if (toSkip > 0) { - data.skipIntegers(toSkip); - advance(toSkip); - rowId += toSkip; - leftInPage -= toSkip; - } - - // read the part [start, end] - n = (int) (end - start + 1); - - switch (mode) { - case RLE: - if (currentValue == state.maxDefinitionLevel) { - data.readIntegers(n, values, offset); - } else { - nulls.putNulls(offset, n); - } - break; - case PACKED: - for (int i = 0; i < n; ++i) { - if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) { - values.putInt(offset + i, data.readInteger()); } else { nulls.putNull(offset + i); } } break; } - rowId += n; - leftInPage -= n; offset += n; leftInBatch -= n; + rowId += n; + leftInPage -= n; currentCount -= n; } }