[SPARK-36056][SQL] Combine readBatch and readIntegers in VectorizedRleValuesReader

### What changes were proposed in this pull request?

Combine `readBatch` and `readIntegers` in `VectorizedRleValuesReader` by having them share the same `readBatchInternal` method.

### Why are the changes needed?

`readBatch` and `readIntegers` share similar code path and this Jira aims to combine them into one method for easier maintenance.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests as this is just a refactoring.

Closes #33271 from sunchao/SPARK-35743-read-integers.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 5edbbd1711)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Chao Sun 2021-07-12 22:30:21 +08:00 committed by Wenchen Fan
parent 4c7ac5fc90
commit 5b2f191228
2 changed files with 27 additions and 79 deletions

View file

@ -216,7 +216,7 @@ public class ParquetVectorUpdaterFactory {
}
}
private static class IntegerUpdater implements ParquetVectorUpdater {
static class IntegerUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
int total,

View file

@ -167,7 +167,29 @@ public final class VectorizedRleValuesReader extends ValuesReader
ParquetReadState state,
WritableColumnVector values,
VectorizedValuesReader valueReader,
ParquetVectorUpdater updater) throws IOException {
ParquetVectorUpdater updater) {
readBatchInternal(state, values, values, valueReader, updater);
}
/**
* Decoding for dictionary ids. The IDs are populated into `values` and the nullability is
* populated into `nulls`.
*/
public void readIntegers(
ParquetReadState state,
WritableColumnVector values,
WritableColumnVector nulls,
VectorizedValuesReader data) {
readBatchInternal(state, values, nulls, data, new ParquetVectorUpdaterFactory.IntegerUpdater());
}
private void readBatchInternal(
ParquetReadState state,
WritableColumnVector values,
WritableColumnVector nulls,
VectorizedValuesReader valueReader,
ParquetVectorUpdater updater) {
int offset = state.offset;
long rowId = state.rowId;
int leftInBatch = state.valuesToReadInBatch;
@ -209,97 +231,23 @@ public final class VectorizedRleValuesReader extends ValuesReader
if (currentValue == state.maxDefinitionLevel) {
updater.readValues(n, offset, values, valueReader);
} else {
values.putNulls(offset, n);
nulls.putNulls(offset, n);
}
break;
case PACKED:
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) {
updater.readValue(offset + i, values, valueReader);
} else {
values.putNull(offset + i);
}
}
break;
}
offset += n;
leftInBatch -= n;
rowId += n;
leftInPage -= n;
currentCount -= n;
}
}
state.advanceOffsetAndRowId(offset, rowId);
}
/**
* Decoding for dictionary ids. The IDs are populated into `values` and the nullability is
* populated into `nulls`.
*/
public void readIntegers(
ParquetReadState state,
WritableColumnVector values,
WritableColumnVector nulls,
VectorizedValuesReader data) throws IOException {
int offset = state.offset;
long rowId = state.rowId;
int leftInBatch = state.valuesToReadInBatch;
int leftInPage = state.valuesToReadInPage;
while (leftInBatch > 0 && leftInPage > 0) {
if (this.currentCount == 0) this.readNextGroup();
int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount));
long rangeStart = state.currentRangeStart();
long rangeEnd = state.currentRangeEnd();
if (rowId + n < rangeStart) {
data.skipIntegers(n);
advance(n);
rowId += n;
leftInPage -= n;
} else if (rowId > rangeEnd) {
state.nextRange();
} else {
// the range [rowId, rowId + n) overlaps with the current row range in state
long start = Math.max(rangeStart, rowId);
long end = Math.min(rangeEnd, rowId + n - 1);
// skip the part [rowId, start)
int toSkip = (int) (start - rowId);
if (toSkip > 0) {
data.skipIntegers(toSkip);
advance(toSkip);
rowId += toSkip;
leftInPage -= toSkip;
}
// read the part [start, end]
n = (int) (end - start + 1);
switch (mode) {
case RLE:
if (currentValue == state.maxDefinitionLevel) {
data.readIntegers(n, values, offset);
} else {
nulls.putNulls(offset, n);
}
break;
case PACKED:
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) {
values.putInt(offset + i, data.readInteger());
} else {
nulls.putNull(offset + i);
}
}
break;
}
rowId += n;
leftInPage -= n;
offset += n;
leftInBatch -= n;
rowId += n;
leftInPage -= n;
currentCount -= n;
}
}