spark-instrumented-optimizer/core
“attilapiros” fd2bf55aba [SPARK-27651][CORE] Avoid the network when shuffle blocks are fetched from the same host
## What changes were proposed in this pull request?

Before this PR `ShuffleBlockFetcherIterator` was partitioning the block fetches into two distinct sets: local reads and remote fetches. Within this PR (when the feature is enabled by "spark.shuffle.readHostLocalDisk.enabled") a new category is introduced: host-local reads. They are shuffle block fetches where although the block manager is different they are running on the same host along with the requester.

Moreover to get the local directories of the other executors/block managers a new RPC message is introduced `GetLocalDirs` which is sent the the block manager master where it is answered as `BlockManagerLocalDirs`. In `BlockManagerMasterEndpoint` for answering this request the `localDirs` is extracted from the `BlockManagerInfo` and stored separately in a hash map called `executorIdLocalDirs`. Because the earlier used `blockManagerInfo` contains data for the alive block managers (see `org.apache.spark.storage.BlockManagerMasterEndpoint#removeBlockManager`).

Now `executorIdLocalDirs` knows all the local dirs up to the application start (like the external shuffle service does) so in case of an RDD recalculation both host-local shuffle blocks and disk persisted RDD blocks on the same host can be served by reading the files behind the blocks directly.

## How was this patch tested?

### Unit tests

`ExternalShuffleServiceSuite`:
- "SPARK-27651: host local disk reading avoids external shuffle service on the same node"

`ShuffleBlockFetcherIteratorSuite`:
- "successful 3 local reads + 4 host local reads + 2 remote reads"

And with extending existing suites where shuffle metrics was tested.

### Manual tests

Running Spark on YARN in a 4 nodes cluster with 6 executors and having 12 shuffle blocks.

```
$ grep host-local experiment.log
19/07/30 03:57:12 INFO storage.ShuffleBlockFetcherIterator: Getting 12 (1496.8 MB) non-empty blocks including 2 (299.4 MB) local blocks and 2 (299.4 MB) host-local blocks and 8 (1197.4 MB) remote blocks
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Start fetching host-local blocks: shuffle_0_2_1, shuffle_0_6_1
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Got host-local blocks in 38 ms
19/07/30 03:57:12 INFO storage.ShuffleBlockFetcherIterator: Getting 12 (1496.8 MB) non-empty blocks including 2 (299.4 MB) local blocks and 2 (299.4 MB) host-local blocks and 8 (1197.4 MB) remote blocks
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Start fetching host-local blocks: shuffle_0_0_0, shuffle_0_8_0
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Got host-local blocks in 35 ms
```

Closes #25299 from attilapiros/SPARK-27651.

Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-11-26 11:02:25 -08:00
..
benchmarks [SPARK-29576][CORE] Use Spark's CompressionCodec for Ser/Deser of MapOutputStatus 2019-10-23 18:17:37 -07:00
src [SPARK-27651][CORE] Avoid the network when shuffle blocks are fetched from the same host 2019-11-26 11:02:25 -08:00
pom.xml [SPARK-30009][CORE][SQL] Support different floating-point Ordering for Scala 2.12 / 2.13 2019-11-26 08:25:53 -08:00