From e980c7a8404ab290998c2dec0e2e2437d675d67c Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 14 Jul 2021 18:14:17 +0800 Subject: [PATCH] [SPARK-36123][SQL] Parquet vectorized reader doesn't skip null values correctly ### What changes were proposed in this pull request? Fix the skipping values logic in Parquet vectorized reader when column index is effective, by considering nulls and only call `ParquetVectorUpdater.skipValues` when the values are non-null. ### Why are the changes needed? Currently, the Parquet vectorized reader may not work correctly if column index filtering is effective, and the data page contains null values. For instance, let's say we have two columns `c1: BIGINT` and `c2: STRING`, and the following pages: ``` * c1 500 500 500 500 * |---------|---------|---------|---------| * |-------|-----|-----|---|---|---|---|---| * c2 400 300 300 200 200 200 200 200 ``` and suppose we have a query like the following: ```sql SELECT * FROM t WHERE c1 = 500 ``` this will create a Parquet row range `[500, 1000)` which, when applied to `c2`, will require us to skip all the rows in `[400,500)`. However the current logic for skipping rows is via `updater.skipValues(n, valueReader)` which is incorrect since this skips the next `n` non-null values. In the case when nulls are present, this will not work correctly. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a new test in `ParquetColumnIndexSuite`. Closes #33330 from sunchao/SPARK-36123-skip-nulls. Authored-by: Chao Sun Signed-off-by: Wenchen Fan --- .../parquet/VectorizedRleValuesReader.java | 62 +++++++++++++------ .../parquet/ParquetColumnIndexSuite.scala | 12 +++- 2 files changed, 54 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index 9d88039457..af739a52d8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -203,8 +203,7 @@ public final class VectorizedRleValuesReader extends ValuesReader long rangeEnd = state.currentRangeEnd(); if (rowId + n < rangeStart) { - updater.skipValues(n, valueReader); - advance(n); + skipValues(n, state, valueReader, updater); rowId += n; leftInPage -= n; } else if (rowId > rangeEnd) { @@ -217,8 +216,7 @@ public final class VectorizedRleValuesReader extends ValuesReader // skip the part [rowId, start) int toSkip = (int) (start - rowId); if (toSkip > 0) { - updater.skipValues(toSkip, valueReader); - advance(toSkip); + skipValues(toSkip, state, valueReader, updater); rowId += toSkip; leftInPage -= toSkip; } @@ -255,6 +253,39 @@ public final class VectorizedRleValuesReader extends ValuesReader state.advanceOffsetAndRowId(offset, rowId); } + /** + * Skip the next `n` values (either null or non-null) from this definition level reader and + * `valueReader`. + */ + private void skipValues( + int n, + ParquetReadState state, + VectorizedValuesReader valuesReader, + ParquetVectorUpdater updater) { + while (n > 0) { + if (this.currentCount == 0) this.readNextGroup(); + int num = Math.min(n, this.currentCount); + switch (mode) { + case RLE: + // we only need to skip non-null values from `valuesReader` since nulls are represented + // via definition levels which are skipped here via decrementing `currentCount`. + if (currentValue == state.maxDefinitionLevel) { + updater.skipValues(num, valuesReader); + } + break; + case PACKED: + for (int i = 0; i < num; ++i) { + // same as above, only skip non-null values from `valuesReader` + if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) { + updater.skipValues(1, valuesReader); + } + } + break; + } + currentCount -= num; + n -= num; + } + } // The RLE reader implements the vectorized decoding interface when used to decode dictionary // IDs. This is different than the above APIs that decodes definitions levels along with values. @@ -358,7 +389,14 @@ public final class VectorizedRleValuesReader extends ValuesReader while (left > 0) { if (this.currentCount == 0) this.readNextGroup(); int n = Math.min(left, this.currentCount); - advance(n); + switch (mode) { + case RLE: + break; + case PACKED: + currentBufferIdx += n; + break; + } + currentCount -= n; left -= n; } } @@ -403,20 +441,6 @@ public final class VectorizedRleValuesReader extends ValuesReader throw new UnsupportedOperationException("only skipIntegers is valid"); } - /** - * Advance and skip the next `n` values in the current block. `n` MUST be <= `currentCount`. - */ - private void advance(int n) { - switch (mode) { - case RLE: - break; - case PACKED: - currentBufferIdx += n; - break; - } - currentCount -= n; - } - /** * Reads the next varint encoded int. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala index 8a217febe1..5f1c5b5cdb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala @@ -38,7 +38,7 @@ class ParquetColumnIndexSuite extends QueryTest with ParquetTest with SharedSpar * col_1 500 500 500 500 * |---------|---------|---------|---------| * |-------|-----|-----|---|---|---|---|---| - * col_2 400 300 200 200 200 200 200 200 + * col_2 400 300 300 200 200 200 200 200 */ def checkUnalignedPages(df: DataFrame)(actions: (DataFrame => DataFrame)*): Unit = { Seq(true, false).foreach { enableDictionary => @@ -92,4 +92,14 @@ class ParquetColumnIndexSuite extends QueryTest with ParquetTest with SharedSpar ) checkUnalignedPages(df)(actions: _*) } + + test("SPARK-36123: reading from unaligned pages - test filters with nulls") { + // insert 50 null values in [400, 450) to verify that they are skipped during processing row + // range [500, 1000) against the second page of col_2 [400, 800) + var df = spark.range(0, 2000).map { i => + val strVal = if (i >= 400 && i < 450) null else i + ":" + "o" * (i / 100).toInt + (i, strVal) + }.toDF() + checkUnalignedPages(df)(actions: _*) + } }