diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index 9d88039457..af739a52d8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -203,8 +203,7 @@ public final class VectorizedRleValuesReader extends ValuesReader long rangeEnd = state.currentRangeEnd(); if (rowId + n < rangeStart) { - updater.skipValues(n, valueReader); - advance(n); + skipValues(n, state, valueReader, updater); rowId += n; leftInPage -= n; } else if (rowId > rangeEnd) { @@ -217,8 +216,7 @@ public final class VectorizedRleValuesReader extends ValuesReader // skip the part [rowId, start) int toSkip = (int) (start - rowId); if (toSkip > 0) { - updater.skipValues(toSkip, valueReader); - advance(toSkip); + skipValues(toSkip, state, valueReader, updater); rowId += toSkip; leftInPage -= toSkip; } @@ -255,6 +253,39 @@ public final class VectorizedRleValuesReader extends ValuesReader state.advanceOffsetAndRowId(offset, rowId); } + /** + * Skip the next `n` values (either null or non-null) from this definition level reader and + * `valueReader`. + */ + private void skipValues( + int n, + ParquetReadState state, + VectorizedValuesReader valuesReader, + ParquetVectorUpdater updater) { + while (n > 0) { + if (this.currentCount == 0) this.readNextGroup(); + int num = Math.min(n, this.currentCount); + switch (mode) { + case RLE: + // we only need to skip non-null values from `valuesReader` since nulls are represented + // via definition levels which are skipped here via decrementing `currentCount`. + if (currentValue == state.maxDefinitionLevel) { + updater.skipValues(num, valuesReader); + } + break; + case PACKED: + for (int i = 0; i < num; ++i) { + // same as above, only skip non-null values from `valuesReader` + if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) { + updater.skipValues(1, valuesReader); + } + } + break; + } + currentCount -= num; + n -= num; + } + } // The RLE reader implements the vectorized decoding interface when used to decode dictionary // IDs. This is different than the above APIs that decodes definitions levels along with values. @@ -358,7 +389,14 @@ public final class VectorizedRleValuesReader extends ValuesReader while (left > 0) { if (this.currentCount == 0) this.readNextGroup(); int n = Math.min(left, this.currentCount); - advance(n); + switch (mode) { + case RLE: + break; + case PACKED: + currentBufferIdx += n; + break; + } + currentCount -= n; left -= n; } } @@ -403,20 +441,6 @@ public final class VectorizedRleValuesReader extends ValuesReader throw new UnsupportedOperationException("only skipIntegers is valid"); } - /** - * Advance and skip the next `n` values in the current block. `n` MUST be <= `currentCount`. - */ - private void advance(int n) { - switch (mode) { - case RLE: - break; - case PACKED: - currentBufferIdx += n; - break; - } - currentCount -= n; - } - /** * Reads the next varint encoded int. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala index 8a217febe1..5f1c5b5cdb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala @@ -38,7 +38,7 @@ class ParquetColumnIndexSuite extends QueryTest with ParquetTest with SharedSpar * col_1 500 500 500 500 * |---------|---------|---------|---------| * |-------|-----|-----|---|---|---|---|---| - * col_2 400 300 200 200 200 200 200 200 + * col_2 400 300 300 200 200 200 200 200 */ def checkUnalignedPages(df: DataFrame)(actions: (DataFrame => DataFrame)*): Unit = { Seq(true, false).foreach { enableDictionary => @@ -92,4 +92,14 @@ class ParquetColumnIndexSuite extends QueryTest with ParquetTest with SharedSpar ) checkUnalignedPages(df)(actions: _*) } + + test("SPARK-36123: reading from unaligned pages - test filters with nulls") { + // insert 50 null values in [400, 450) to verify that they are skipped during processing row + // range [500, 1000) against the second page of col_2 [400, 800) + var df = spark.range(0, 2000).map { i => + val strVal = if (i >= 400 && i < 450) null else i + ":" + "o" * (i / 100).toInt + (i, strVal) + }.toDF() + checkUnalignedPages(df)(actions: _*) + } }