spark-instrumented-optimizer/common
Venkata krishnan Sowrirajan c039d99812 [SPARK-32923][CORE][SHUFFLE] Handle indeterminate stage retries for push-based shuffle
### What changes were proposed in this pull request?
[[SPARK-23243](https://issues.apache.org/jira/browse/SPARK-23243)] and [[SPARK-25341](https://issues.apache.org/jira/browse/SPARK-25341)] addressed cases of stage retries for indeterminate stage involving operations like repartition. This PR addresses the same issues in the context of push-based shuffle. Currently there is no way to distinguish the current execution of a stage for a shuffle ID. Therefore the changes explained below are necessary.

Core changes are summarized as follows:

1. Introduce a new variable `shuffleMergeId` in `ShuffleDependency` which is monotonically increasing value tracking the temporal ordering of execution of <stage-id, stage-attempt-id> for a shuffle ID.
2. Correspondingly make changes in the push-based shuffle protocol layer in `MergedShuffleFileManager`, `BlockStoreClient` passing the `shuffleMergeId` in order to keep track of the shuffle output in separate files on the shuffle service side.
3. `DAGScheduler` increments the `shuffleMergeId` tracked in `ShuffleDependency` in the cases of a indeterministic stage execution
4. Deterministic stage will have `shuffleMergeId` set to 0 as no special handling is needed in this case and indeterminate stage will have `shuffleMergeId` starting from 1.

### Why are the changes needed?

New protocol changes are needed due to the reasons explained above.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?
Added new unit tests in `RemoteBlockPushResolverSuite, DAGSchedulerSuite, BlockIdSuite, ErrorHandlerSuite`

Closes #33034 from venkata91/SPARK-32923.

Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-08-01 23:16:33 -05:00
..
kvstore [SPARK-36362][CORE][SQL][FOLLOWUP] Fix java linter errors 2021-08-01 21:40:03 +09:00
network-common [SPARK-32923][CORE][SHUFFLE] Handle indeterminate stage retries for push-based shuffle 2021-08-01 23:16:33 -05:00
network-shuffle [SPARK-32923][CORE][SHUFFLE] Handle indeterminate stage retries for push-based shuffle 2021-08-01 23:16:33 -05:00
network-yarn [SPARK-35259][SHUFFLE] Update ExternalBlockHandler Timer variables to expose correct units 2021-07-24 21:26:18 +08:00
sketch [SPARK-35996][BUILD] Setting version to 3.3.0-SNAPSHOT 2021-07-02 13:47:36 -07:00
tags [SPARK-35996][BUILD] Setting version to 3.3.0-SNAPSHOT 2021-07-02 13:47:36 -07:00
unsafe [SPARK-36362][CORE][SQL][FOLLOWUP] Fix java linter errors 2021-08-01 21:40:03 +09:00