7c59aeeef4
### What changes were proposed in this pull request? When using dynamic partition overwrite, each task has its working dir under staging dir like `stagingDir/.spark-staging-{jobId}`, each task commits to `outputPath/.spark-staging-{jobId}/{partitionId}/part-{taskId}-{jobId}{ext}`. When speculation enable, multiple task attempts would be setup for one task, **they have same task id and they would commit to same file concurrently**. Due to host done or node preemption, the partly-committed files aren't cleaned up, a FileAlreadyExistsException would be raised in this situation, resulting in job failure. I don't try to change task commit process for dynamic partition overwrite, like adding attempt id to task working dir for each attempts and committing to final output dir via a new outputCommitCoordinator, here is reason: 1. `FileOutputCommitter` already has commit coordinator for each task attempts, we can leverage it rather than build a new one. 2. To say the least, we implement a coordinator solving task attempts commit conflict, suppose a severe case, application master failover, tasks with same attempt id and same task id would commit to same files, the `FileAlreadyExistsException` risk still exists In this pr, I leverage FileOutputCommitter to solve the problem: 1. when initing a write job description, set `outputPath/.spark-staging-{jobId}` as the output dir 2. each task attempt writes output to `outputPath/.spark-staging-{jobId}/_temporary/${appAttemptId}/_temporary/${taskAttemptId}/{partitionId}/part-{taskId}-{jobId}{ext}` 3. leverage `FileOutputCommitter` coordinator, write job firstly commits output to `outputPath/.spark-staging-{jobId}/{partitionId}` 4. for dynamic partition overwrite, write job finally move `outputPath/.spark-staging-{jobId}/{partitionId}` to `outputPath/{partitionId}` ### Why are the changes needed? Without this pr, dynamic partition overwrite would fail ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? added UT. Closes #29000 from WinkerDu/master-fix-dynamic-partition-multi-commit. Authored-by: duripeng <duripeng@baidu.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
---|---|---|
.. | ||
benchmarks | ||
src | ||
pom.xml |