spark-instrumented-optimizer/common
Yuanjian Li 239ee3f561 [SPARK-9853][CORE] Optimize shuffle fetch of continuous partition IDs
This PR takes over #19788. After we split the shuffle fetch protocol from `OpenBlock` in #24565, this optimization can be extended in the new shuffle protocol. Credit to yucai, closes #19788.

### What changes were proposed in this pull request?
This PR adds the support for continuous shuffle block fetching in batch:

- Shuffle client changes:
    - Add new feature tag `spark.shuffle.fetchContinuousBlocksInBatch`, implement the decision logic in `BlockStoreShuffleReader`.
    - Merge the continuous shuffle block ids in batch if needed in ShuffleBlockFetcherIterator.
- Shuffle server changes:
    - Add support in `ExternalBlockHandler` for the external shuffle service side.
    - Make `ShuffleBlockResolver.getBlockData` accept getting block data by range.
- Protocol changes:
    - Add new block id type `ShuffleBlockBatchId` represent continuous shuffle block ids.
    - Extend `FetchShuffleBlocks` and `OneForOneBlockFetcher`.
    - After the new shuffle fetch protocol completed in #24565, the backward compatibility for external shuffle service can be controlled by `spark.shuffle.useOldFetchProtocol`.

### Why are the changes needed?
In adaptive execution, one reducer may fetch multiple continuous shuffle blocks from one map output file. However, as the original approach, each reducer needs to fetch those 10 reducer blocks one by one. This way needs many IO and impacts performance. This PR is to support fetching those continuous shuffle blocks in one IO (batch way). See below example:

The shuffle block is stored like below:
![image](https://user-images.githubusercontent.com/2989575/51654634-c37fbd80-1fd3-11e9-935e-5652863676c3.png)
The ShuffleId format is s"shuffle_$shuffleId_$mapId_$reduceId", referring to BlockId.scala.

In adaptive execution, one reducer may want to read output for reducer 5 to 14, whose block Ids are from shuffle_0_x_5 to shuffle_0_x_14.
Before this PR, Spark needs 10 disk IOs + 10 network IOs for each output file.
After this PR, Spark only needs 1 disk IO and 1 network IO. This way can reduce IO dramatically.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Add new UT.
Integrate test with setting `spark.sql.adaptive.enabled=true`.

Closes #26040 from xuanyuanking/SPARK-9853.

Lead-authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Co-authored-by: yucai <yyu1@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-10-17 14:47:56 +08:00
..
kvstore [MINOR][BUILD] Fix about 15 misc build warnings 2019-09-19 11:37:42 -07:00
network-common [SPARK-25341][CORE] Support rolling back a shuffle map stage and re-generate the shuffle files 2019-09-23 16:16:52 +08:00
network-shuffle [SPARK-9853][CORE] Optimize shuffle fetch of continuous partition IDs 2019-10-17 14:47:56 +08:00
network-yarn [SPARK-28593][CORE] Rename ShuffleClient to BlockStoreClient which more close to its usage 2019-08-05 14:54:45 +08:00
sketch [SPARK-28604][ML] Use log1p(x) over log(1+x) and expm1(x) over exp(x)-1 for accuracy 2019-08-04 17:04:01 -05:00
tags [SPARK-29191][TESTS][SQL] Add tag ExtendedSQLTest for SQLQueryTestSuite 2019-09-22 13:53:21 -07:00
unsafe [SPARK-29369][SQL] Support string intervals without the interval prefix 2019-10-14 23:34:18 +08:00