spark-instrumented-optimizer/sql/core
Jungtaek Lim (HeartSaVioR) cbb714f67e [SPARK-29438][SS] Use partition ID of StateStoreAwareZipPartitionsRDD for determining partition ID of state store in stream-stream join
### What changes were proposed in this pull request?

Credit to uncleGen for discovering the problem and providing simple reproducer as UT. New UT in this patch is borrowed from #26156 and I'm retaining a commit from #26156 (except unnecessary part on this path) to properly give a credit.

This patch fixes the issue that partition ID could be mis-assigned when the query contains UNION and stream-stream join is placed on the right side. We assume the range of partition IDs as `(0 ~ number of shuffle partitions - 1)` for stateful operators, but when we use stream-stream join on the right side of UNION, the range of partition ID of task goes to `(number of partitions in left side, number of partitions in left side + number of shuffle partitions - 1)`, which `number of partitions in left side` can be changed in some cases (new UT points out the one of the cases).

The root reason of bug is that stream-stream join picks the partition ID from TaskContext, which wouldn't be same as partition ID from source if union is being used. Hopefully we can pick the right partition ID from source in StateStoreAwareZipPartitionsRDD - this patch leverages that partition ID.

### Why are the changes needed?

This patch will fix the broken of assumption of partition range on stateful operator, as well as fix the issue reported in JIRA issue SPARK-29438.

### Does this PR introduce any user-facing change?

Yes, if their query is using UNION and stream-stream join is placed on the right side. They may encounter the problem to read state from checkpoint and may need to discard checkpoint to continue.

### How was this patch tested?

Added UT which fails on current master branch, and passes with this patch.

Closes #26162 from HeartSaVioR/SPARK-29438.

Lead-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Co-authored-by: uncleGen <hustyugm@gmail.com>
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
2020-01-30 20:21:43 -08:00
..
benchmarks [SPARK-30323][SQL] Support filters pushdown in CSV datasource 2020-01-16 13:10:08 +09:00
src [SPARK-29438][SS] Use partition ID of StateStoreAwareZipPartitionsRDD for determining partition ID of state store in stream-stream join 2020-01-30 20:21:43 -08:00
v1.2/src [SPARK-30578][SQL][TEST] Explicitly set conf to use DSv2 for orc in OrcFilterSuite 2020-01-20 21:42:33 +08:00
v2.3/src [SPARK-30578][SQL][TEST] Explicitly set conf to use DSv2 for orc in OrcFilterSuite 2020-01-20 21:42:33 +08:00
pom.xml [INFRA] Reverts commit 56dcd79 and c216ef1 2019-12-16 19:57:44 -07:00