7e8eb0447b
### What changes were proposed in this pull request? This PR adds a check to RowReader#hasNextRow such that multiple calls to RowReader#hasNextRow with no intervening call to RowReader#nextRow will avoid consuming more than 1 record. This PR also modifies RowReader#nextRow such that consecutive calls will return new rows (previously consecutive calls would return the same row). ### Why are the changes needed? SPARK-32346 slightly refactored the AvroFileFormat and AvroPartitionReaderFactory to use a new iterator-like trait called AvroUtils#RowReader. RowReader#hasNextRow consumes a raw input record and stores the deserialized row for the next call to RowReader#nextRow. Unfortunately, sometimes hasNextRow is called twice before nextRow is called, resulting in a lost row. For example (which assumes V1 Avro reader): ```scala val df = spark.range(0, 25).toDF("index") df.write.mode("overwrite").format("avro").save("index_avro") val loaded = spark.read.format("avro").load("index_avro") // The following will give the expected size loaded.collect.size // The following will give the wrong size loaded.orderBy("index").collect.size ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added tests, which fail without the fix. Closes #30221 from bersprockets/avro_iterator_play. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
---|---|---|
.. | ||
avro | ||
docker | ||
docker-integration-tests | ||
kafka-0-10 | ||
kafka-0-10-assembly | ||
kafka-0-10-sql | ||
kafka-0-10-token-provider | ||
kinesis-asl | ||
kinesis-asl-assembly | ||
spark-ganglia-lgpl |