### What changes were proposed in this pull request?
Add provided Guava dependency to `network-yarn` module.
### Why are the changes needed?
In Spark 3.1 and earlier the `network-yarn` module implicitly relies on Guava from `hadoop-client` dependency. This was changed by SPARK-33212 where we moved to shaded Hadoop client which no longer expose the transitive Guava dependency. It stayed fine for a while since we were not using `createDependencyReducedPom` so it picks up the transitive dependency from `spark-network-common` instead. However, things start to break after SPARK-36835 where we restored `createDependencyReducedPom` and now it is no longer able to locate Guava classes:
```
build/mvn test -pl common/network-yarn -Phadoop-3.2 -Phive-thriftserver -Pkinesis-asl -Pkubernetes -Pmesos -Pnetlib-lgpl -Pscala-2.12 -Pspark-ganglia-lgpl -Pyarn
...
[INFO] Compiling 1 Java source to /Users/sunchao/git/spark/common/network-yarn/target/scala-2.12/classes ...
[WARNING] [Warn] : bootstrap class path not set in conjunction with -source 8
[ERROR] [Error] /Users/sunchao/git/spark/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:32: package com.google.common.annotations does not exist
[ERROR] [Error] /Users/sunchao/git/spark/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:33: package com.google.common.base does not exist
[ERROR] [Error] /Users/sunchao/git/spark/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:34: package com.google.common.collect does not exist
[ERROR] [Error] /Users/sunchao/git/spark/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:118: cannot find symbol
symbol: class VisibleForTesting
location: class org.apache.spark.network.yarn.YarnShuffleService
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tested with the above `mvn` command and it's now passing.
Closes#34125 from sunchao/SPARK-36873.
Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 53f58b6e51)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
Remove the appAttemptId from TransportConf, and parsing through SparkEnv.
### Why are the changes needed?
Push based shuffle will fail if there are any attemptId set in the SparkConf, as the attemptId is not set correctly in Driver.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tested within our Yarn cluster. Without this PR, the Driver will fail to finalize the shuffle merge on all the mergers. After the patch, Driver can successfully finalize the shuffle merge and the push based shuffle can work fine.
Also with unit test to verify the attemptId is being set in the BlockStoreClient in Driver.
Closes#34018 from zhouyejoe/SPARK-36772.
Authored-by: Ye Zhou <yezhou@linkedin.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit cabc36b54d)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
Improve exception handling in the Platform initialization, where it attempts to assess whether reflection is possible to modify DirectByteBuffer. This can apparently fail in more cases on Java 9+ than are currently handled, whereas Spark can continue without reflection if needed.
More detailed comments on the change inline.
### Why are the changes needed?
This exception seems to be possible and fails startup:
```
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make private java.nio.DirectByteBuffer(long,int) accessible: module java.base does not "opens java.nio" to unnamed module 71e9ddb4
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357)
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
at java.base/java.lang.reflect.Constructor.checkCanSetAccessible(Constructor.java:188)
at java.base/java.lang.reflect.Constructor.setAccessible(Constructor.java:181)
at org.apache.spark.unsafe.Platform.<clinit>(Platform.java:56)
```
### Does this PR introduce _any_ user-facing change?
Should strictly allow Spark to continue in more cases.
### How was this patch tested?
Existing tests.
Closes#33947 from srowen/SPARK-36704.
Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
(cherry picked from commit e5283f5ed5)
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
Use WeakReference not SoftReference in LevelDB
### Why are the changes needed?
(See discussion at https://github.com/apache/spark/pull/28769#issuecomment-906722390 )
"The soft reference to iterator introduced in this pr unfortunately ended up causing iterators to not be closed when they go out of scope (which would have happened earlier in the finalize)
This is because java is more conservative in cleaning up SoftReference's.
The net result was we ended up having 50k files for SHS while typically they get compacted away to 200 odd files.
Changing from SoftReference to WeakReference should make it much more aggresive in cleanup and prevent the issue - which we observed in a 3.1 SHS"
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests
Closes#33859 from srowen/SPARK-36603.
Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 89e907f76c)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Minor changes to change the config key name from `spark.shuffle.server.mergedShuffleFileManagerImpl` to `spark.shuffle.push.server.mergedShuffleFileManagerImpl`. This is missed out in https://github.com/apache/spark/pull/33615.
### Why are the changes needed?
To keep the config names consistent
### Does this PR introduce _any_ user-facing change?
Yes, this is a change in the config key name. But the new config name changes are yet to be released. Technically there is no user facing change because of this change.
### How was this patch tested?
Existing tests.
Closes#33799 from venkata91/SPARK-36374-follow-up.
Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 7b2842e986)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
AuthEngineSuite was passing on some platforms (MacOS), but failing on others (Linux) with an InvalidKeyException stemming from this line. We should explicitly pass AES as the key format.
### What changes were proposed in this pull request?
Changes the AuthEngine SecretKeySpec from "RAW" to "AES".
### Why are the changes needed?
Unit tests were failing on some platforms with InvalidKeyExceptions when this key was used to instantiate a Cipher.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests on a MacOS and Linux platform.
Closes#33790 from sweisdb/patch-1.
Authored-by: sweisdb <60895808+sweisdb@users.noreply.github.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
(cherry picked from commit c441c7e365)
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
Add a new type of error message in BlockPushErrorHandler which indicates the PushblockStream message is received after a new application attempt has started. This error message should be correctly handled in client without retrying the block push.
### Why are the changes needed?
When we get a block push failure because of the too old attempt, we will not retry pushing the block nor log the exception on the client side.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add the corresponding unit test.
Closes#33617 from zhuqi-lucas/master.
Authored-by: zhuqi-lucas <821684824@qq.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 05cd5f97c3)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
### What changes were proposed in this pull request?
Document the push-based shuffle feature with a high level overview of the feature and corresponding configuration options for both shuffle server side as well as client side. This is how the changes to the doc looks on the browser ([img](https://user-images.githubusercontent.com/8871522/129231582-ad86ee2f-246f-4b42-9528-4ccd693e86d2.png))
### Why are the changes needed?
Helps users understand the feature
### Does this PR introduce _any_ user-facing change?
Docs
### How was this patch tested?
N/A
Closes#33615 from venkata91/SPARK-36374.
Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 2270ecf32f)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
### What changes were proposed in this pull request?
Fix an intermittent test failure due to Netty dependency version bump.
Starting from Netty 4.1.52, its AbstractChannel will throw a new `StacklessClosedChannelException` for channel closed exception.
A hardcoded list of Strings to match for channel closed exception in `RPCIntegrationSuite` was not updated, thus leading to the intermittent test failure reported in #33613
### Why are the changes needed?
Fix intermittent test failure
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Closes#33713 from Victsm/SPARK-36378-followup.
Authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit b8e2186fe1)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
We have run performance evaluations on the version of push-based shuffle committed to upstream so far, and have identified a few places for further improvements:
1. On the server side, we have noticed that the usage of `String.format`, especially when receiving a block push request, has a much higher overhead compared with string concatenation.
2. On the server side, the usage of `Throwables.getStackTraceAsString` in the `ErrorHandler.shouldRetryError` and `ErrorHandler.shouldLogError` has generated quite some overhead.
These 2 issues are related to how we are currently handling certain common block push failures.
We are communicating such failures via `RPCFailure` by transmitting the exception stack trace.
This generates the overhead on both server and client side for creating these exceptions and makes checking the type of failures fragile and inefficient with string matching of exception stack trace.
To address these, this PR also proposes to encode the common block push failure as an error code and send that back to the client with a proper RPC message.
Improve shuffle service efficiency for push-based shuffle.
Improve code robustness for handling block push failures.
No
Existing unit tests.
Closes#33613 from Victsm/SPARK-36378.
Lead-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Min Shen <victor.nju@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 3f09093a21)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
### What changes were proposed in this pull request?
Pull out NoOpMergedShuffleFileManager inner class outside. This is required since passing dollar sign ($) for the config (`spark.shuffle.server.mergedShuffleFileManagerImpl`) value can be an issue. Currently `spark.shuffle.server.mergedShuffleFileManagerImpl` is by default set to `org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager`. After this change the default value be set to `org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager`
### Why are the changes needed?
Passing `$` for the config value can be an issue.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Modified existing unit tests.
Closes#33688 from venkata91/SPARK-36460.
Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit df0de83c46)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
Cleanup `RemoteBlockPushResolver` log messages by using `AppShufflePartitionInfo#toString()` to avoid duplications. Currently this is based off of https://github.com/apache/spark/pull/33034 will remove those changes once it is merged and remove the WIP at that time.
### Why are the changes needed?
Minor cleanup to make code more readable.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No tests, just changing log messages
Closes#33561 from venkata91/SPARK-36332.
Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
(cherry picked from commit ab897109a3)
Signed-off-by: yi.wu <yi.wu@databricks.com>
### What changes were proposed in this pull request?
When channel terminated will call `connectionTerminated` and remove corresponding StreamState,
then all coming request on this StreamState will throw NPE like
```
2021-07-31 22:00:24,810 ERROR server.ChunkFetchRequestHandler (ChunkFetchRequestHandler.java:lambda$respond$1(146)) - Error sending result ChunkFetchFailure[streamChunkId=StreamChunkId[streamId=1119950114515,chunkIndex=0],errorString=java.lang.NullPointerException
at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:80)
at org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:101)
at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:82)
at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
at org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
at org.sparkproject.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
] to /ip:53818; closing connection
java.nio.channels.ClosedChannelException
at org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
at org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
at org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
at org.sparkproject.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:112)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
at org.sparkproject.io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
at org.sparkproject.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
```
Since JVM will not show stack of NPE exception if it happen many times.
```
021-07-28 08:25:44,720 ERROR server.ChunkFetchRequestHandler (ChunkFetchRequestHandler.java:lambda$respond$1(146)) - Error sending result ChunkFetchFailure[streamChunkId=StreamChunkId[streamId=1187623335353,chunkIndex=11],errorString=java.lang.NullPoint
erException
] to /10.130.10.5:42148; closing connection
java.nio.channels.ClosedChannelException
```
Makes user confused.
We should improved this error message?
### Why are the changes needed?
Improve error message
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Closes#33622 from AngersZhuuuu/SPARK-36391.
Lead-authored-by: Angerszhuuuu <angers.zhu@gmaihu@gmail.com>
Co-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
(cherry picked from commit b377ea26e2)
Signed-off-by: yi.wu <yi.wu@databricks.com>
### What changes were proposed in this pull request?
Clean up older shuffleMergeId shuffle files when finalize request for higher shuffleMergeId is received when no blocks pushed for the corresponding shuffleMergeId. This is identified as part of https://github.com/apache/spark/pull/33034#discussion_r680610872.
### Why are the changes needed?
Without this change, older shuffleMergeId files won't be cleaned up properly.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added changes to existing unit test to address this case.
Closes#33605 from venkata91/SPARK-32923-follow-on.
Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit d8169493b6)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
### What changes were proposed in this pull request?
This PR adds support to diagnose shuffle data corruption. Basically, the diagnosis mechanism works like this:
The shuffler reader would calculate the checksum (c1) for the corrupted shuffle block and send it to the server where the block is stored. At the server, it would read back the checksum (c2) that is stored in the checksum file and recalculate the checksum (c3) for the corresponding shuffle block. Then, if c2 != c3, we suspect the corruption is caused by the disk issue. Otherwise, if c1 != c3, we suspect the corruption is caused by the network issue. Otherwise, the checksum verifies pass. In any case of the error, the cause remains unknown.
After the shuffle reader receives the diagnosis response, it'd take the action bases on the type of cause. Only in case of the network issue, we'd give a retry. Otherwise, we'd throw the fetch failure directly. Also note that, if the corruption happens inside BufferReleasingInputStream, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs. If corruption happens again after retry, the reducer will throw the fetch failure directly this time without the diagnosis.
Please check out https://github.com/apache/spark/pull/32385 to see the completed proposal of the shuffle checksum project.
### Why are the changes needed?
Shuffle data corruption is a long-standing issue in Spark. For example, in SPARK-18105, people continually reports corruption issue. However, data corruption is difficult to reproduce in most cases and even harder to tell the root cause. We don't know if it's a Spark issue or not. With the diagnosis support for the shuffle corruption, Spark itself can at least distinguish the cause between disk and network, which is very important for users.
### Does this PR introduce _any_ user-facing change?
Yes, users may know the cause of the shuffle corruption after this change.
### How was this patch tested?
Added tests.
Closes#33451 from Ngone51/SPARK-36206.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit a98d919da4)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
[[SPARK-23243](https://issues.apache.org/jira/browse/SPARK-23243)] and [[SPARK-25341](https://issues.apache.org/jira/browse/SPARK-25341)] addressed cases of stage retries for indeterminate stage involving operations like repartition. This PR addresses the same issues in the context of push-based shuffle. Currently there is no way to distinguish the current execution of a stage for a shuffle ID. Therefore the changes explained below are necessary.
Core changes are summarized as follows:
1. Introduce a new variable `shuffleMergeId` in `ShuffleDependency` which is monotonically increasing value tracking the temporal ordering of execution of <stage-id, stage-attempt-id> for a shuffle ID.
2. Correspondingly make changes in the push-based shuffle protocol layer in `MergedShuffleFileManager`, `BlockStoreClient` passing the `shuffleMergeId` in order to keep track of the shuffle output in separate files on the shuffle service side.
3. `DAGScheduler` increments the `shuffleMergeId` tracked in `ShuffleDependency` in the cases of a indeterministic stage execution
4. Deterministic stage will have `shuffleMergeId` set to 0 as no special handling is needed in this case and indeterminate stage will have `shuffleMergeId` starting from 1.
New protocol changes are needed due to the reasons explained above.
No
Added new unit tests in `RemoteBlockPushResolverSuite, DAGSchedulerSuite, BlockIdSuite, ErrorHandlerSuite`
Closes#33034 from venkata91/SPARK-32923.
Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit c039d99812)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
### What changes were proposed in this pull request?
This is a follow-up to #29855 according to the [comments](https://github.com/apache/spark/pull/29855/files#r505536514)
In this PR, the following changes are made:
1. A new `BlockPushingListener` interface is created specifically for block push. The existing `BlockFetchingListener` interface is left as is, since it might be used by external shuffle solutions. These 2 interfaces are unified under `BlockTransferListener` to enable code reuse.
2. `RetryingBlockFetcher`, `BlockFetchStarter`, and `RetryingBlockFetchListener` are renamed to `RetryingBlockTransferor`, `BlockTransferStarter`, and `RetryingBlockTransferListener` respectively. This makes their names more generic to be reused across both block fetch and push.
3. Comments in `OneForOneBlockPusher` are further clarified to better explain how we handle retries for block push.
### Why are the changes needed?
To make code cleaner without sacrificing backward compatibility.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing unit tests.
Closes#33340 from Victsm/SPARK-32915-followup.
Lead-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Min Shen <victor.nju@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit c4aa54ed4e)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
### What changes were proposed in this pull request?
`ExternalBlockHandler` exposes 4 metrics which are Dropwizard `Timer` metrics, and are named with a `millis` suffix:
```
private final Timer openBlockRequestLatencyMillis = new Timer();
private final Timer registerExecutorRequestLatencyMillis = new Timer();
private final Timer fetchMergedBlocksMetaLatencyMillis = new Timer();
private final Timer finalizeShuffleMergeLatencyMillis = new Timer();
```
However these Dropwizard Timers by default use nanoseconds ([documentation](https://metrics.dropwizard.io/3.2.3/getting-started.html#timers)).
This causes `YarnShuffleServiceMetrics` to expose confusingly-named metrics like `openBlockRequestLatencyMillis_nanos_max` (the actual values are currently in nanos).
This PR adds a new `Timer` subclass, `TimerWithCustomTimeUnit`, which accepts a `TimeUnit` at creation time and exposes timing information using this time unit when values are read. Internally, values are still stored with nanosecond-level precision. The `Timer` metrics within `ExternalBlockHandler` are updated to use the new class with milliseconds as the unit. The logic to include the `nanos` suffix in the metric name within `YarnShuffleServiceMetrics` has also been removed, with the assumption that the metric name itself includes the units.
### Does this PR introduce _any_ user-facing change?
Yes, there are two changes.
First, the names for metrics exposed by `ExternalBlockHandler` via `YarnShuffleServiceMetrics` such as `openBlockRequestLatencyMillis_nanos_max` and `openBlockRequestLatencyMillis_nanos_50thPercentile` have been changed to remove the `_nanos` suffix. This would be considered a breaking change, but these names were only exposed as part of #32388, which has not yet been released (slated for 3.2.0). New names are like `openBlockRequestLatencyMillis_max` and `openBlockRequestLatencyMillis_50thPercentile`
Second, the values of the metrics themselves have changed, to expose milliseconds instead of nanoseconds. Note that this does not affect metrics such as `openBlockRequestLatencyMillis_count` or `openBlockRequestLatencyMillis_rate1`, only the `Snapshot`-related metrics (`max`, `median`, percentiles, etc.). For the YARN case, these metrics were also introduced by #32388, and thus also have not yet been released. It was possible for the nanosecond values to be consumed by some other metrics reporter reading the Dropwizard metrics directly, but I'm not aware of any such usages.
### How was this patch tested?
Unit tests have been updated.
Closes#33116 from xkrogen/xkrogen-SPARK-35259-ess-fix-metric-unit-prefix.
Authored-by: Erik Krogen <xkrogen@apache.org>
Signed-off-by: yi.wu <yi.wu@databricks.com>
(cherry picked from commit 70a15868fc)
Signed-off-by: yi.wu <yi.wu@databricks.com>
### What changes were proposed in this pull request?
Once the shuffle is cleaned up by the `ContextCleaner`, the shuffle files are deleted by the executors. In this case, the push of the shuffle data by the executors can throw `FileNotFoundException`s because the shuffle files are deleted. When this exception is thrown from the `shuffle-block-push-thread`, it causes the executor to exit. Both the `shuffle-block-push` threads and the netty event-loops will encounter `FileNotFoundException`s in this case. The fix here stops these threads from pushing more blocks when they encounter `FileNotFoundException`. When the exception is from the `shuffle-block-push-thread`, it will get handled and logged as warning instead of failing the executor.
### Why are the changes needed?
This fixes the bug which causes executor to exits when they are instructed to clean up shuffle data.
Below is the stacktrace of this exception:
```
21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[block-push-thread-1,5,main]
java.lang.Error: java.io.IOException: Error in opening FileSegmentManagedBuffer
{file=********/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, offset=10640, length=190}
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error in opening FileSegmentManagedBuffer\{file=*******/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, offset=10640, length=190}
at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89)
at org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294)
at org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270)
at org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191)
at org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
... 2 more
Caused by: java.io.FileNotFoundException: ******/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243)
at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:62)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added a unit to verify no more data is pushed when `FileNotFoundException` is encountered. Have also verified in our environment.
Closes#33477 from otterc/SPARK-36255.
Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
(cherry picked from commit 09e1c61272)
Signed-off-by: yi.wu <yi.wu@databricks.com>
This commit fixes the use of the "o.appAttemptId" variable instead of the mistaken "appAttemptId" variable. The current situation is a comparison of identical values. Jira issue report SPARK-36273.
### What changes were proposed in this pull request?
This is a patch for SPARK-35546 which is needed for push-based shuffle.
### Why are the changes needed?
A very minor fix of adding the reference from the other "FinalizeShuffleMerge".
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No unit tests were added. It's a pretty logical change.
Closes#33493 from almogtavor/patch-1.
Authored-by: Almog Tavor <70065337+almogtavor@users.noreply.github.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
(cherry picked from commit 530c8addbb)
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle.
### Summary of the change:
When Executor registers with Shuffle Service, it will encode the merged shuffle dir created and also the application attemptId into the ShuffleManagerMeta into Json. Then in Shuffle Service, it will decode the Json string and get the correct merged shuffle dir and also the attemptId. If the registration comes from a newer attempt, the merged shuffle information will be updated to store the information from the newer attempt.
This PR also refactored the management of the merged shuffle information to avoid concurrency issues.
### Why are the changes needed?
Refer to the SPIP in SPARK-30602.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as documented in the SPIP doc.
Closes#33078 from zhouyejoe/SPARK-35546.
Authored-by: Ye Zhou <yezhou@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit c77acf0bbc)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
### What changes were proposed in this pull request?
This PR modifies comment for `UTF8String.trimAll` and`sql-migration-guide.mld`.
The comment for `UTF8String.trimAll` says like as follows.
```
Trims whitespaces ({literal <=} ASCII 32) from both ends of this string.
```
Similarly, `sql-migration-guide.md` mentions about the behavior of `cast` like as follows.
```
In Spark 3.0, when casting string value to integral types(tinyint, smallint, int and bigint),
datetime types(date, timestamp and interval) and boolean type,
the leading and trailing whitespaces (<= ASCII 32) will be trimmed before converted to these type values,
for example, `cast(' 1\t' as int)` results `1`, `cast(' 1\t' as boolean)` results `true`,
`cast('2019-10-10\t as date)` results the date value `2019-10-10`.
In Spark version 2.4 and below, when casting string to integrals and booleans,
it does not trim the whitespaces from both ends; the foregoing results is `null`,
while to datetimes, only the trailing spaces (= ASCII 32) are removed.
```
But SPARK-32559 (#29375) changed the behavior and only whitespace ASCII characters will be trimmed since Spark 3.0.1.
### Why are the changes needed?
To follow the previous change.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Confirmed the document built by the following command.
```
SKIP_API=1 bundle exec jekyll build
```
Closes#33287 from sarutak/fix-utf8string-trim-issue.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 57a4f310df)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This adds two new additional metrics to `ExternalBlockHandler`:
- `blockTransferRate` -- for indicating the rate of transferring blocks, vs. the data within them
- `blockTransferAvgSize_1min` -- a 1-minute trailing average of block sizes transferred by the ESS
Additionally, this enhances `YarnShuffleServiceMetrics` to expose the histogram/`Snapshot` information from `Timer` metrics within `ExternalBlockHandler`.
### Why are the changes needed?
Currently `ExternalBlockHandler` exposes some useful metrics, but is lacking around metrics for the rate of block transfers. We have `blockTransferRateBytes` to tell us the rate of _bytes_, but no metric to tell us the rate of _blocks_, which is especially relevant when running the ESS on HDDs that are sensitive to random reads. Many small block transfers can have a negative impact on performance, but won't show up as a spike in `blockTransferRateBytes` since the sizes are small. Thus the new metrics to show information around average block size and block transfer rate are very useful to monitor the health/performance of the ESS, especially when running on HDDs.
For the `YarnShuffleServiceMetrics`, currently the three `Timer` metrics exposed by `ExternalBlockHandler` are being underutilized in a YARN-based environment -- they are basically treated as a `Meter`, only exposing rate-based information, when the metrics themselves are collected detailed histograms of timing information. We should expose this information for better observability.
### Does this PR introduce _any_ user-facing change?
Yes, there are two entirely new metrics for the ESS, as documented in `monitoring.md`. Additionally in a YARN environment, `Timer` metrics exposed by the ESS will include more rich timing information.
### How was this patch tested?
New unit tests are added to verify that new metrics are showing up as expected.
We have been running this patch internally for approx. 1 year and have found it to be useful for monitoring the health of ESS and diagnosing performance issues.
Closes#32388 from xkrogen/xkrogen-SPARK-35258-ess-new-metrics.
Authored-by: Erik Krogen <xkrogen@apache.org>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
This adds support in the ESS to serve merged shuffle block meta and data requests to executors.
This change is needed for fetching remote merged shuffle data from the remote shuffle services. This is part of push-based shuffle SPIP [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
This change introduces new messages between clients and the external shuffle service:
1. `MergedBlockMetaRequest`: The client sends this to external shuffle to get the meta information for a merged block. The response to this is one of these :
- `MergedBlockMetaSuccess` : contains request id, number of chunks, and a `ManagedBuffer` which is a `FileSegmentBuffer` backed by the merged block meta file.
- `RpcFailure`: this is sent back to client in case of failure. This is an existing message.
2. `FetchShuffleBlockChunks`: This is similar to `FetchShuffleBlocks` message but it is to fetch merged shuffle chunks instead of blocks.
### Why are the changes needed?
These changes are needed for push-based shuffle. Refer to the SPIP in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
We have already verified the functionality and the improved performance as documented in the SPIP doc.
Lead-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com
Closes#32811 from otterc/SPARK-35671.
Lead-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
This PR aims to promote `LevelDBSuite.IntKeyType` class to a normal class to isolate `InMemoryIteratorSuite` from `LevelDBSuite`.
### Why are the changes needed?
We have the following test suite hierarchy.
```
DBIteratorSuite
- InMemoryIteratorSuite
- LevelDBIteratorSuite
```
`DBIteratorSuite.testRefWithIntNaturalKey` depends on `LevelDBSuite` and `InMemoryIteratorSuite` derived it. `InMemoryIteratorSuite` should not depend not `LevelDB`-specific stuff. This PR will make it sure.
```
public void testRefWithIntNaturalKey() throws Exception {
LevelDBSuite.IntKeyType i = new LevelDBSuite.IntKeyType();
...
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs.
```
$ build/sbt "kvstore/test"
```
Closes#32971 from dongjoon-hyun/SPARK-35824.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This change is for [SPARK-35757](https://issues.apache.org/jira/browse/SPARK-35757) and does the following:
1. adds bitwise AND operation to BitArray (similar to existing `putAll` method)
2. adds an intersect operation for combining bloom filters using bitwise AND operation (similar to existing `mergeInPlace` method).
### Why are the changes needed?
The current bloom filter library only allows combining two bloom filters using OR operation. It is useful to have AND operation as well.
### Does this PR introduce _any_ user-facing change?
No, just adds new methods.
### How was this patch tested?
Just the existing tests.
Closes#32907 from kudhru/master.
Lead-authored-by: kudhru <gargdhruv36@gmail.com>
Co-authored-by: Dhruv Kumar <kudhru@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle.
### Summary of changes:
Executor will create the merge directories under the application temp directory provided by YARN. The access control of the folder will be set to 770, where Shuffle Service can create merged shuffle files and write merge shuffle data in to those files.
Serve the merged shuffle blocks fetch request, read the merged shuffle blocks.
### Why are the changes needed?
Refer to the SPIP in SPARK-30602.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as documented in the SPIP doc.
Lead-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Ye Zhou yezhoulinkedin.com
Closes#32007 from zhouyejoe/SPARK-33350.
Lead-authored-by: Ye Zhou <yezhou@linkedin.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
Remove some useless code in the ExternalBlockHandler.
### Why are the changes needed?
There is some useless code in the ExternalBlockHandler, so we may remove it.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Exist unittests.
Closes#32571 from weixiuli/SPARK-35424.
Authored-by: weixiuli <weixiuli@jd.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>