[SPARK-36123][SQL] Parquet vectorized reader doesn't skip null values correctly

### What changes were proposed in this pull request?

Fix the skipping values logic in Parquet vectorized reader when column index is effective, by considering nulls and only call `ParquetVectorUpdater.skipValues` when the values are non-null.

### Why are the changes needed?

Currently, the Parquet vectorized reader may not work correctly if column index filtering is effective, and the data page contains null values. For instance, let's say we have two columns `c1: BIGINT` and `c2: STRING`, and the following pages:
```
   * c1        500       500       500       500
   *  |---------|---------|---------|---------|
   *  |-------|-----|-----|---|---|---|---|---|
   * c2     400   300   300 200 200 200 200 200
```

and suppose we have a query like the following:
```sql
SELECT * FROM t WHERE c1 = 500
```

this will create a Parquet row range `[500, 1000)` which, when applied to `c2`, will require us to skip all the rows in `[400,500)`. However the current logic for skipping rows is via `updater.skipValues(n, valueReader)` which is incorrect since this skips the next `n` non-null values. In the case when nulls are present, this will not work correctly.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a new test in `ParquetColumnIndexSuite`.

Closes #33330 from sunchao/SPARK-36123-skip-nulls.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Chao Sun 2021-07-14 18:14:17 +08:00 committed by Wenchen Fan
parent b86645776b
commit e980c7a840
2 changed files with 54 additions and 20 deletions

View file

@ -203,8 +203,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
long rangeEnd = state.currentRangeEnd();
if (rowId + n < rangeStart) {
updater.skipValues(n, valueReader);
advance(n);
skipValues(n, state, valueReader, updater);
rowId += n;
leftInPage -= n;
} else if (rowId > rangeEnd) {
@ -217,8 +216,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
// skip the part [rowId, start)
int toSkip = (int) (start - rowId);
if (toSkip > 0) {
updater.skipValues(toSkip, valueReader);
advance(toSkip);
skipValues(toSkip, state, valueReader, updater);
rowId += toSkip;
leftInPage -= toSkip;
}
@ -255,6 +253,39 @@ public final class VectorizedRleValuesReader extends ValuesReader
state.advanceOffsetAndRowId(offset, rowId);
}
/**
* Skip the next `n` values (either null or non-null) from this definition level reader and
* `valueReader`.
*/
private void skipValues(
int n,
ParquetReadState state,
VectorizedValuesReader valuesReader,
ParquetVectorUpdater updater) {
while (n > 0) {
if (this.currentCount == 0) this.readNextGroup();
int num = Math.min(n, this.currentCount);
switch (mode) {
case RLE:
// we only need to skip non-null values from `valuesReader` since nulls are represented
// via definition levels which are skipped here via decrementing `currentCount`.
if (currentValue == state.maxDefinitionLevel) {
updater.skipValues(num, valuesReader);
}
break;
case PACKED:
for (int i = 0; i < num; ++i) {
// same as above, only skip non-null values from `valuesReader`
if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) {
updater.skipValues(1, valuesReader);
}
}
break;
}
currentCount -= num;
n -= num;
}
}
// The RLE reader implements the vectorized decoding interface when used to decode dictionary
// IDs. This is different than the above APIs that decodes definitions levels along with values.
@ -358,7 +389,14 @@ public final class VectorizedRleValuesReader extends ValuesReader
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
int n = Math.min(left, this.currentCount);
advance(n);
switch (mode) {
case RLE:
break;
case PACKED:
currentBufferIdx += n;
break;
}
currentCount -= n;
left -= n;
}
}
@ -403,20 +441,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
throw new UnsupportedOperationException("only skipIntegers is valid");
}
/**
* Advance and skip the next `n` values in the current block. `n` MUST be <= `currentCount`.
*/
private void advance(int n) {
switch (mode) {
case RLE:
break;
case PACKED:
currentBufferIdx += n;
break;
}
currentCount -= n;
}
/**
* Reads the next varint encoded int.
*/

View file

@ -38,7 +38,7 @@ class ParquetColumnIndexSuite extends QueryTest with ParquetTest with SharedSpar
* col_1 500 500 500 500
* |---------|---------|---------|---------|
* |-------|-----|-----|---|---|---|---|---|
* col_2 400 300 200 200 200 200 200 200
* col_2 400 300 300 200 200 200 200 200
*/
def checkUnalignedPages(df: DataFrame)(actions: (DataFrame => DataFrame)*): Unit = {
Seq(true, false).foreach { enableDictionary =>
@ -92,4 +92,14 @@ class ParquetColumnIndexSuite extends QueryTest with ParquetTest with SharedSpar
)
checkUnalignedPages(df)(actions: _*)
}
test("SPARK-36123: reading from unaligned pages - test filters with nulls") {
// insert 50 null values in [400, 450) to verify that they are skipped during processing row
// range [500, 1000) against the second page of col_2 [400, 800)
var df = spark.range(0, 2000).map { i =>
val strVal = if (i >= 400 && i < 450) null else i + ":" + "o" * (i / 100).toInt
(i, strVal)
}.toDF()
checkUnalignedPages(df)(actions: _*)
}
}