spark-instrumented-optimizer/common
Chandni Singh d00f0695b7 [SPARK-32917][SHUFFLE][CORE] Adds support for executors to push shuffle blocks after successful map task completion
### What changes were proposed in this pull request?
This is the shuffle writer side change where executors can push data to remote shuffle services. This is needed for push-based shuffle - SPIP [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
Summary of changes:
- This adds support for executors to push shuffle blocks after map tasks complete writing shuffle data.
- This also introduces a timeout specifically for creating connection to remote shuffle services.

### Why are the changes needed?
- These changes are needed for push-based shuffle. Refer to the SPIP in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
- The main reason to create a separate connection creation timeout is because the existing `connectionTimeoutMs` is overloaded and is used for connection creation timeouts as well as connection idle timeout. The connection creation timeout should be much lower than the idle timeouts. The default for `connectionTimeoutMs` is 120s. This is quite high for just establishing the connections.  If a shuffle server node is bad then the connection creation will fail within few seconds. However, an overloaded shuffle server may take much longer to respond to a request and the channel can stay idle for a much longer time which is expected.  Another reason is that with push-based shuffle, an executor may be fetching shuffle data and pushing shuffle data (next stage) simultaneously. Both these tasks will share the same connections with the shuffle service. If there is a bad shuffle server node and the connection creation timeout is very high then both these tasks end up waiting a long time time eventually impacting the performance.

### Does this PR introduce _any_ user-facing change?
Yes. This PR introduces client-side configs for push-based shuffle. If push-based shuffle is turned-off then the users will not see any change.

### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
We have already verified the functionality and the improved performance as documented in the SPIP doc.

Lead-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Ye Zhou yezhoulinkedin.com

Closes #30312 from otterc/SPARK-32917.

Lead-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Co-authored-by: Min Shen <mshen@linked.in.com>
Co-authored-by: Ye Zhou <yezhou@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-01-08 12:21:56 -06:00
..
kvstore [SPARK-33662][BUILD] Setting version to 3.2.0-SNAPSHOT 2020-12-04 14:10:42 -08:00
network-common [SPARK-32917][SHUFFLE][CORE] Adds support for executors to push shuffle blocks after successful map task completion 2021-01-08 12:21:56 -06:00
network-shuffle [SPARK-32916][SHUFFLE][TEST-MAVEN][TEST-HADOOP2.7] Ensure the number of chunks in meta file and index file are equal 2020-12-23 12:42:18 -06:00
network-yarn [SPARK-33662][BUILD] Setting version to 3.2.0-SNAPSHOT 2020-12-04 14:10:42 -08:00
sketch [SPARK-33662][BUILD] Setting version to 3.2.0-SNAPSHOT 2020-12-04 14:10:42 -08:00
tags [SPARK-33662][BUILD] Setting version to 3.2.0-SNAPSHOT 2020-12-04 14:10:42 -08:00
unsafe [SPARK-33662][BUILD] Setting version to 3.2.0-SNAPSHOT 2020-12-04 14:10:42 -08:00