### What changes were proposed in this pull request?
Add provided Guava dependency to `network-yarn` module.
### Why are the changes needed?
In Spark 3.1 and earlier the `network-yarn` module implicitly relies on Guava from `hadoop-client` dependency. This was changed by SPARK-33212 where we moved to shaded Hadoop client which no longer expose the transitive Guava dependency. It stayed fine for a while since we were not using `createDependencyReducedPom` so it picks up the transitive dependency from `spark-network-common` instead. However, things start to break after SPARK-36835 where we restored `createDependencyReducedPom` and now it is no longer able to locate Guava classes:
```
build/mvn test -pl common/network-yarn -Phadoop-3.2 -Phive-thriftserver -Pkinesis-asl -Pkubernetes -Pmesos -Pnetlib-lgpl -Pscala-2.12 -Pspark-ganglia-lgpl -Pyarn
...
[INFO] Compiling 1 Java source to /Users/sunchao/git/spark/common/network-yarn/target/scala-2.12/classes ...
[WARNING] [Warn] : bootstrap class path not set in conjunction with -source 8
[ERROR] [Error] /Users/sunchao/git/spark/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:32: package com.google.common.annotations does not exist
[ERROR] [Error] /Users/sunchao/git/spark/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:33: package com.google.common.base does not exist
[ERROR] [Error] /Users/sunchao/git/spark/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:34: package com.google.common.collect does not exist
[ERROR] [Error] /Users/sunchao/git/spark/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:118: cannot find symbol
symbol: class VisibleForTesting
location: class org.apache.spark.network.yarn.YarnShuffleService
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tested with the above `mvn` command and it's now passing.
Closes#34125 from sunchao/SPARK-36873.
Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
Remove the appAttemptId from TransportConf, and parsing through SparkEnv.
### Why are the changes needed?
Push based shuffle will fail if there are any attemptId set in the SparkConf, as the attemptId is not set correctly in Driver.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tested within our Yarn cluster. Without this PR, the Driver will fail to finalize the shuffle merge on all the mergers. After the patch, Driver can successfully finalize the shuffle merge and the push based shuffle can work fine.
Also with unit test to verify the attemptId is being set in the BlockStoreClient in Driver.
Closes#34018 from zhouyejoe/SPARK-36772.
Authored-by: Ye Zhou <yezhou@linkedin.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
Improve exception handling in the Platform initialization, where it attempts to assess whether reflection is possible to modify DirectByteBuffer. This can apparently fail in more cases on Java 9+ than are currently handled, whereas Spark can continue without reflection if needed.
More detailed comments on the change inline.
### Why are the changes needed?
This exception seems to be possible and fails startup:
```
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make private java.nio.DirectByteBuffer(long,int) accessible: module java.base does not "opens java.nio" to unnamed module 71e9ddb4
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357)
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
at java.base/java.lang.reflect.Constructor.checkCanSetAccessible(Constructor.java:188)
at java.base/java.lang.reflect.Constructor.setAccessible(Constructor.java:181)
at org.apache.spark.unsafe.Platform.<clinit>(Platform.java:56)
```
### Does this PR introduce _any_ user-facing change?
Should strictly allow Spark to continue in more cases.
### How was this patch tested?
Existing tests.
Closes#33947 from srowen/SPARK-36704.
Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
Use WeakReference not SoftReference in LevelDB
### Why are the changes needed?
(See discussion at https://github.com/apache/spark/pull/28769#issuecomment-906722390 )
"The soft reference to iterator introduced in this pr unfortunately ended up causing iterators to not be closed when they go out of scope (which would have happened earlier in the finalize)
This is because java is more conservative in cleaning up SoftReference's.
The net result was we ended up having 50k files for SHS while typically they get compacted away to 200 odd files.
Changing from SoftReference to WeakReference should make it much more aggresive in cleanup and prevent the issue - which we observed in a 3.1 SHS"
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests
Closes#33859 from srowen/SPARK-36603.
Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Minor changes to change the config key name from `spark.shuffle.server.mergedShuffleFileManagerImpl` to `spark.shuffle.push.server.mergedShuffleFileManagerImpl`. This is missed out in https://github.com/apache/spark/pull/33615.
### Why are the changes needed?
To keep the config names consistent
### Does this PR introduce _any_ user-facing change?
Yes, this is a change in the config key name. But the new config name changes are yet to be released. Technically there is no user facing change because of this change.
### How was this patch tested?
Existing tests.
Closes#33799 from venkata91/SPARK-36374-follow-up.
Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
This pr revert the change of SPARK-34309, includes:
- https://github.com/apache/spark/pull/31517
- https://github.com/apache/spark/pull/33772
### Why are the changes needed?
1. No really performance improvement in Spark
2. Added an additional dependency
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass the Jenkins or GitHub Action
Closes#33784 from LuciferYang/revert-caffeine.
Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
AuthEngineSuite was passing on some platforms (MacOS), but failing on others (Linux) with an InvalidKeyException stemming from this line. We should explicitly pass AES as the key format.
### What changes were proposed in this pull request?
Changes the AuthEngine SecretKeySpec from "RAW" to "AES".
### Why are the changes needed?
Unit tests were failing on some platforms with InvalidKeyExceptions when this key was used to instantiate a Cipher.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests on a MacOS and Linux platform.
Closes#33790 from sweisdb/patch-1.
Authored-by: sweisdb <60895808+sweisdb@users.noreply.github.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
The main change of this pr is converting an int literal to a long literal to avoid potential integer multiplications overflow risk.
For example:
**Before**
```java
void f(int i) {
long val = 65536 * i;
}
```
**After**
```java
void f(int i) {
long val = 65536L * i;
}
```
### Why are the changes needed?
Avoid potential integer multiplications overflow risk
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass the Jenkins or GitHub Action
Closes#33629 from LuciferYang/cast-to-long.
Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
Add a new type of error message in BlockPushErrorHandler which indicates the PushblockStream message is received after a new application attempt has started. This error message should be correctly handled in client without retrying the block push.
### Why are the changes needed?
When we get a block push failure because of the too old attempt, we will not retry pushing the block nor log the exception on the client side.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add the corresponding unit test.
Closes#33617 from zhuqi-lucas/master.
Authored-by: zhuqi-lucas <821684824@qq.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
Document the push-based shuffle feature with a high level overview of the feature and corresponding configuration options for both shuffle server side as well as client side. This is how the changes to the doc looks on the browser ([img](https://user-images.githubusercontent.com/8871522/129231582-ad86ee2f-246f-4b42-9528-4ccd693e86d2.png))
### Why are the changes needed?
Helps users understand the feature
### Does this PR introduce _any_ user-facing change?
Docs
### How was this patch tested?
N/A
Closes#33615 from venkata91/SPARK-36374.
Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
Fix an intermittent test failure due to Netty dependency version bump.
Starting from Netty 4.1.52, its AbstractChannel will throw a new `StacklessClosedChannelException` for channel closed exception.
A hardcoded list of Strings to match for channel closed exception in `RPCIntegrationSuite` was not updated, thus leading to the intermittent test failure reported in #33613
### Why are the changes needed?
Fix intermittent test failure
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Closes#33713 from Victsm/SPARK-36378-followup.
Authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
Fix a Java style issue blocking CI.
### Why are the changes needed?
Unblocking CI.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
CI
Closes#33715 from viirya/hotfix.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
### What changes were proposed in this pull request?
We have run performance evaluations on the version of push-based shuffle committed to upstream so far, and have identified a few places for further improvements:
1. On the server side, we have noticed that the usage of `String.format`, especially when receiving a block push request, has a much higher overhead compared with string concatenation.
2. On the server side, the usage of `Throwables.getStackTraceAsString` in the `ErrorHandler.shouldRetryError` and `ErrorHandler.shouldLogError` has generated quite some overhead.
These 2 issues are related to how we are currently handling certain common block push failures.
We are communicating such failures via `RPCFailure` by transmitting the exception stack trace.
This generates the overhead on both server and client side for creating these exceptions and makes checking the type of failures fragile and inefficient with string matching of exception stack trace.
To address these, this PR also proposes to encode the common block push failure as an error code and send that back to the client with a proper RPC message.
### Why are the changes needed?
Improve shuffle service efficiency for push-based shuffle.
Improve code robustness for handling block push failures.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing unit tests.
Closes#33613 from Victsm/SPARK-36378.
Lead-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Min Shen <victor.nju@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
Pull out NoOpMergedShuffleFileManager inner class outside. This is required since passing dollar sign ($) for the config (`spark.shuffle.server.mergedShuffleFileManagerImpl`) value can be an issue. Currently `spark.shuffle.server.mergedShuffleFileManagerImpl` is by default set to `org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager`. After this change the default value be set to `org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager`
### Why are the changes needed?
Passing `$` for the config value can be an issue.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Modified existing unit tests.
Closes#33688 from venkata91/SPARK-36460.
Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
Cleanup `RemoteBlockPushResolver` log messages by using `AppShufflePartitionInfo#toString()` to avoid duplications. Currently this is based off of https://github.com/apache/spark/pull/33034 will remove those changes once it is merged and remove the WIP at that time.
### Why are the changes needed?
Minor cleanup to make code more readable.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No tests, just changing log messages
Closes#33561 from venkata91/SPARK-36332.
Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
### What changes were proposed in this pull request?
When channel terminated will call `connectionTerminated` and remove corresponding StreamState,
then all coming request on this StreamState will throw NPE like
```
2021-07-31 22:00:24,810 ERROR server.ChunkFetchRequestHandler (ChunkFetchRequestHandler.java:lambda$respond$1(146)) - Error sending result ChunkFetchFailure[streamChunkId=StreamChunkId[streamId=1119950114515,chunkIndex=0],errorString=java.lang.NullPointerException
at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:80)
at org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:101)
at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:82)
at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
at org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
at org.sparkproject.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
] to /ip:53818; closing connection
java.nio.channels.ClosedChannelException
at org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
at org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
at org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
at org.sparkproject.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:112)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
at org.sparkproject.io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
at org.sparkproject.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
```
Since JVM will not show stack of NPE exception if it happen many times.
```
021-07-28 08:25:44,720 ERROR server.ChunkFetchRequestHandler (ChunkFetchRequestHandler.java:lambda$respond$1(146)) - Error sending result ChunkFetchFailure[streamChunkId=StreamChunkId[streamId=1187623335353,chunkIndex=11],errorString=java.lang.NullPoint
erException
] to /10.130.10.5:42148; closing connection
java.nio.channels.ClosedChannelException
```
Makes user confused.
We should improved this error message?
### Why are the changes needed?
Improve error message
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Closes#33622 from AngersZhuuuu/SPARK-36391.
Lead-authored-by: Angerszhuuuu <angers.zhu@gmaihu@gmail.com>
Co-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
### What changes were proposed in this pull request?
There are 3 ways to use Guava cache in spark code:
1. `Loadingcache` is the main way to use Guava cache in spark code and the key usages are as follows:
a. `LoadingCache` with `maximumsize` data eviction policy, such as `appCache` in `ApplicationCache`, `cache` in `Codegenerator`
b. `LoadingCache` with `maximumWeight` data eviction policy, such as `shuffleIndexCache` in `ExternalShuffleBlockResolver`
c. `LoadingCache` with 'expireAfterWrite' data eviction policy, such as `tableRelationCache` in `SessionCatalog`
2. `ManualCache` is another way to use Guava cache in spark code and the key usage is `cache` in `SharedInMemoryCache`, it use to caches partition file statuses in memory
3. The last use way is `hadoopJobMetadata` in `SparkEnv`, it uses Guava Cache to build a `soft-reference map`.
The goal of this pr is use `Caffeine` instead of `Guava Cache` because `Caffeine` is faster than `Guava Cache` from benchmarks, the main changes as follows:
1. Add `Caffeine` deps to maven `pom.xml`
2. Use `Caffeine` instead of Guava `LoadingCache`, `ManualCache` and soft-reference map in `SparkEnv`
3. Add `LocalCacheBenchmark` to compare performance of `Loadingcache` between `Guava Cache` and `Caffeine`
### Why are the changes needed?
`Caffeine` is faster than `Guava Cache` from benchmarks
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Pass the Jenkins or GitHub Action
- Add `LocalCacheBenchmark` to compare performance of `Loadingcache` between `Guava Cache` and `Caffeine`
Closes#31517 from LuciferYang/guava-cache-to-caffeine.
Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Holden Karau <hkarau@netflix.com>
### What changes were proposed in this pull request?
Clean up older shuffleMergeId shuffle files when finalize request for higher shuffleMergeId is received when no blocks pushed for the corresponding shuffleMergeId. This is identified as part of https://github.com/apache/spark/pull/33034#discussion_r680610872.
### Why are the changes needed?
Without this change, older shuffleMergeId files won't be cleaned up properly.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added changes to existing unit test to address this case.
Closes#33605 from venkata91/SPARK-32923-follow-on.
Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
This PR adds support to diagnose shuffle data corruption. Basically, the diagnosis mechanism works like this:
The shuffler reader would calculate the checksum (c1) for the corrupted shuffle block and send it to the server where the block is stored. At the server, it would read back the checksum (c2) that is stored in the checksum file and recalculate the checksum (c3) for the corresponding shuffle block. Then, if c2 != c3, we suspect the corruption is caused by the disk issue. Otherwise, if c1 != c3, we suspect the corruption is caused by the network issue. Otherwise, the checksum verifies pass. In any case of the error, the cause remains unknown.
After the shuffle reader receives the diagnosis response, it'd take the action bases on the type of cause. Only in case of the network issue, we'd give a retry. Otherwise, we'd throw the fetch failure directly. Also note that, if the corruption happens inside BufferReleasingInputStream, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs. If corruption happens again after retry, the reducer will throw the fetch failure directly this time without the diagnosis.
Please check out https://github.com/apache/spark/pull/32385 to see the completed proposal of the shuffle checksum project.
### Why are the changes needed?
Shuffle data corruption is a long-standing issue in Spark. For example, in SPARK-18105, people continually reports corruption issue. However, data corruption is difficult to reproduce in most cases and even harder to tell the root cause. We don't know if it's a Spark issue or not. With the diagnosis support for the shuffle corruption, Spark itself can at least distinguish the cause between disk and network, which is very important for users.
### Does this PR introduce _any_ user-facing change?
Yes, users may know the cause of the shuffle corruption after this change.
### How was this patch tested?
Added tests.
Closes#33451 from Ngone51/SPARK-36206.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
[[SPARK-23243](https://issues.apache.org/jira/browse/SPARK-23243)] and [[SPARK-25341](https://issues.apache.org/jira/browse/SPARK-25341)] addressed cases of stage retries for indeterminate stage involving operations like repartition. This PR addresses the same issues in the context of push-based shuffle. Currently there is no way to distinguish the current execution of a stage for a shuffle ID. Therefore the changes explained below are necessary.
Core changes are summarized as follows:
1. Introduce a new variable `shuffleMergeId` in `ShuffleDependency` which is monotonically increasing value tracking the temporal ordering of execution of <stage-id, stage-attempt-id> for a shuffle ID.
2. Correspondingly make changes in the push-based shuffle protocol layer in `MergedShuffleFileManager`, `BlockStoreClient` passing the `shuffleMergeId` in order to keep track of the shuffle output in separate files on the shuffle service side.
3. `DAGScheduler` increments the `shuffleMergeId` tracked in `ShuffleDependency` in the cases of a indeterministic stage execution
4. Deterministic stage will have `shuffleMergeId` set to 0 as no special handling is needed in this case and indeterminate stage will have `shuffleMergeId` starting from 1.
### Why are the changes needed?
New protocol changes are needed due to the reasons explained above.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added new unit tests in `RemoteBlockPushResolverSuite, DAGSchedulerSuite, BlockIdSuite, ErrorHandlerSuite`
Closes#33034 from venkata91/SPARK-32923.
Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
This is a follow-up of #33594 to fix the Java linter error.
### Why are the changes needed?
To recover GitHub Action.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the GitHub Action.
Closes#33601 from dongjoon-hyun/SPARK-36362.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Fix up some minor Java issues:
- Some int*int multiplications that widen to long maybe could overflow
- Unnecessarily non-static inner classes
- Some tests "catch (AssertionError)" and do nothing
- Manual array iteration vs very slightly faster/simpler foreach
- Incorrect generic types that just happen to not cause a runtime error
- Missed opportunities for try-close
- Mutable enums
- .. and a few other minor things
### Why are the changes needed?
Some are minor but clear fixes; some may have a marginal perf impact or avoid a bug later. Also: maybe avoid future PRs to address these one by one.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests
Closes#33594 from srowen/SPARK-36362.
Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
The main change of this pr is try to use `computeIfAbsent` to simplify the process of put and get missing value in a map.
**Before**
```java
V oldValue = map.get(key);
if (oldValue == null) {
V newValue = functionToProduceMissValue
map.put(key, newValue);
}
```
**After**
```java
map.computeIfAbsent(key, functionToProduceMissValue);
```
### Why are the changes needed?
Simplify code use Java 8 API.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass the Jenkins or GitHub Action
Closes#33558 from LuciferYang/SPARK-36326.
Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
This is a follow-up to #29855 according to the [comments](https://github.com/apache/spark/pull/29855/files#r505536514)
In this PR, the following changes are made:
1. A new `BlockPushingListener` interface is created specifically for block push. The existing `BlockFetchingListener` interface is left as is, since it might be used by external shuffle solutions. These 2 interfaces are unified under `BlockTransferListener` to enable code reuse.
2. `RetryingBlockFetcher`, `BlockFetchStarter`, and `RetryingBlockFetchListener` are renamed to `RetryingBlockTransferor`, `BlockTransferStarter`, and `RetryingBlockTransferListener` respectively. This makes their names more generic to be reused across both block fetch and push.
3. Comments in `OneForOneBlockPusher` are further clarified to better explain how we handle retries for block push.
### Why are the changes needed?
To make code cleaner without sacrificing backward compatibility.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing unit tests.
Closes#33340 from Victsm/SPARK-32915-followup.
Lead-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Min Shen <victor.nju@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
`ExternalBlockHandler` exposes 4 metrics which are Dropwizard `Timer` metrics, and are named with a `millis` suffix:
```
private final Timer openBlockRequestLatencyMillis = new Timer();
private final Timer registerExecutorRequestLatencyMillis = new Timer();
private final Timer fetchMergedBlocksMetaLatencyMillis = new Timer();
private final Timer finalizeShuffleMergeLatencyMillis = new Timer();
```
However these Dropwizard Timers by default use nanoseconds ([documentation](https://metrics.dropwizard.io/3.2.3/getting-started.html#timers)).
This causes `YarnShuffleServiceMetrics` to expose confusingly-named metrics like `openBlockRequestLatencyMillis_nanos_max` (the actual values are currently in nanos).
This PR adds a new `Timer` subclass, `TimerWithCustomTimeUnit`, which accepts a `TimeUnit` at creation time and exposes timing information using this time unit when values are read. Internally, values are still stored with nanosecond-level precision. The `Timer` metrics within `ExternalBlockHandler` are updated to use the new class with milliseconds as the unit. The logic to include the `nanos` suffix in the metric name within `YarnShuffleServiceMetrics` has also been removed, with the assumption that the metric name itself includes the units.
### Does this PR introduce _any_ user-facing change?
Yes, there are two changes.
First, the names for metrics exposed by `ExternalBlockHandler` via `YarnShuffleServiceMetrics` such as `openBlockRequestLatencyMillis_nanos_max` and `openBlockRequestLatencyMillis_nanos_50thPercentile` have been changed to remove the `_nanos` suffix. This would be considered a breaking change, but these names were only exposed as part of #32388, which has not yet been released (slated for 3.2.0). New names are like `openBlockRequestLatencyMillis_max` and `openBlockRequestLatencyMillis_50thPercentile`
Second, the values of the metrics themselves have changed, to expose milliseconds instead of nanoseconds. Note that this does not affect metrics such as `openBlockRequestLatencyMillis_count` or `openBlockRequestLatencyMillis_rate1`, only the `Snapshot`-related metrics (`max`, `median`, percentiles, etc.). For the YARN case, these metrics were also introduced by #32388, and thus also have not yet been released. It was possible for the nanosecond values to be consumed by some other metrics reporter reading the Dropwizard metrics directly, but I'm not aware of any such usages.
### How was this patch tested?
Unit tests have been updated.
Closes#33116 from xkrogen/xkrogen-SPARK-35259-ess-fix-metric-unit-prefix.
Authored-by: Erik Krogen <xkrogen@apache.org>
Signed-off-by: yi.wu <yi.wu@databricks.com>
### What changes were proposed in this pull request?
Once the shuffle is cleaned up by the `ContextCleaner`, the shuffle files are deleted by the executors. In this case, the push of the shuffle data by the executors can throw `FileNotFoundException`s because the shuffle files are deleted. When this exception is thrown from the `shuffle-block-push-thread`, it causes the executor to exit. Both the `shuffle-block-push` threads and the netty event-loops will encounter `FileNotFoundException`s in this case. The fix here stops these threads from pushing more blocks when they encounter `FileNotFoundException`. When the exception is from the `shuffle-block-push-thread`, it will get handled and logged as warning instead of failing the executor.
### Why are the changes needed?
This fixes the bug which causes executor to exits when they are instructed to clean up shuffle data.
Below is the stacktrace of this exception:
```
21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[block-push-thread-1,5,main]
java.lang.Error: java.io.IOException: Error in opening FileSegmentManagedBuffer
{file=********/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, offset=10640, length=190}
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error in opening FileSegmentManagedBuffer\{file=*******/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, offset=10640, length=190}
at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89)
at org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294)
at org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270)
at org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191)
at org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
... 2 more
Caused by: java.io.FileNotFoundException: ******/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243)
at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:62)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added a unit to verify no more data is pushed when `FileNotFoundException` is encountered. Have also verified in our environment.
Closes#33477 from otterc/SPARK-36255.
Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
This commit fixes the use of the "o.appAttemptId" variable instead of the mistaken "appAttemptId" variable. The current situation is a comparison of identical values. Jira issue report SPARK-36273.
### What changes were proposed in this pull request?
This is a patch for SPARK-35546 which is needed for push-based shuffle.
### Why are the changes needed?
A very minor fix of adding the reference from the other "FinalizeShuffleMerge".
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No unit tests were added. It's a pretty logical change.
Closes#33493 from almogtavor/patch-1.
Authored-by: Almog Tavor <70065337+almogtavor@users.noreply.github.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle.
### Summary of the change:
When Executor registers with Shuffle Service, it will encode the merged shuffle dir created and also the application attemptId into the ShuffleManagerMeta into Json. Then in Shuffle Service, it will decode the Json string and get the correct merged shuffle dir and also the attemptId. If the registration comes from a newer attempt, the merged shuffle information will be updated to store the information from the newer attempt.
This PR also refactored the management of the merged shuffle information to avoid concurrency issues.
### Why are the changes needed?
Refer to the SPIP in SPARK-30602.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as documented in the SPIP doc.
Closes#33078 from zhouyejoe/SPARK-35546.
Authored-by: Ye Zhou <yezhou@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
This PR modifies comment for `UTF8String.trimAll` and`sql-migration-guide.mld`.
The comment for `UTF8String.trimAll` says like as follows.
```
Trims whitespaces ({literal <=} ASCII 32) from both ends of this string.
```
Similarly, `sql-migration-guide.md` mentions about the behavior of `cast` like as follows.
```
In Spark 3.0, when casting string value to integral types(tinyint, smallint, int and bigint),
datetime types(date, timestamp and interval) and boolean type,
the leading and trailing whitespaces (<= ASCII 32) will be trimmed before converted to these type values,
for example, `cast(' 1\t' as int)` results `1`, `cast(' 1\t' as boolean)` results `true`,
`cast('2019-10-10\t as date)` results the date value `2019-10-10`.
In Spark version 2.4 and below, when casting string to integrals and booleans,
it does not trim the whitespaces from both ends; the foregoing results is `null`,
while to datetimes, only the trailing spaces (= ASCII 32) are removed.
```
But SPARK-32559 (#29375) changed the behavior and only whitespace ASCII characters will be trimmed since Spark 3.0.1.
### Why are the changes needed?
To follow the previous change.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Confirmed the document built by the following command.
```
SKIP_API=1 bundle exec jekyll build
```
Closes#33287 from sarutak/fix-utf8string-trim-issue.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
There was a regression since Spark started storing large remote files on disk (https://issues.apache.org/jira/browse/SPARK-22062). In 2018 a refactoring introduced a hidden reference preventing the auto-deletion of the files (a97001d217 (diff-42a673b8fa5f2b999371dc97a5de7ebd2c2ec19447353d39efb7e8ebc012fe32L1677)). Since then all underlying files of DownloadFile instances are kept on disk for the duration of the Spark application which sometimes results in "no space left" errors.
`ReferenceWithCleanup` class uses `file` (the `DownloadFile`) in `cleanUp(): Unit` method so it has to keep a reference to it which prevents it from being garbage-collected.
```
def cleanUp(): Unit = {
logDebug(s"Clean up file $filePath")
if (!file.delete()) { <--- here
logDebug(s"Fail to delete file $filePath")
}
}
```
### Why are the changes needed?
Long-running Spark applications require freeing resources when they are not needed anymore, and iterative algorithms could use all the disk space quickly too.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added a test in BlockManagerSuite and tested manually.
Closes#33251 from dtarima/fix-download-file-cleanup.
Authored-by: Denis Tarima <dtarima@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
This PR aims to update `master` branch version to 3.3.0-SNAPSHOT.
### Why are the changes needed?
Start to prepare Apache Spark 3.3.0 and the published snapshot version should not conflict with `branch-3.2`.
### Does this PR introduce _any_ user-facing change?
N/A.
### How was this patch tested?
Pass the CIs.
Closes#33196 from dongjoon-hyun/SPARK-35996.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This adds two new additional metrics to `ExternalBlockHandler`:
- `blockTransferRate` -- for indicating the rate of transferring blocks, vs. the data within them
- `blockTransferAvgSize_1min` -- a 1-minute trailing average of block sizes transferred by the ESS
Additionally, this enhances `YarnShuffleServiceMetrics` to expose the histogram/`Snapshot` information from `Timer` metrics within `ExternalBlockHandler`.
### Why are the changes needed?
Currently `ExternalBlockHandler` exposes some useful metrics, but is lacking around metrics for the rate of block transfers. We have `blockTransferRateBytes` to tell us the rate of _bytes_, but no metric to tell us the rate of _blocks_, which is especially relevant when running the ESS on HDDs that are sensitive to random reads. Many small block transfers can have a negative impact on performance, but won't show up as a spike in `blockTransferRateBytes` since the sizes are small. Thus the new metrics to show information around average block size and block transfer rate are very useful to monitor the health/performance of the ESS, especially when running on HDDs.
For the `YarnShuffleServiceMetrics`, currently the three `Timer` metrics exposed by `ExternalBlockHandler` are being underutilized in a YARN-based environment -- they are basically treated as a `Meter`, only exposing rate-based information, when the metrics themselves are collected detailed histograms of timing information. We should expose this information for better observability.
### Does this PR introduce _any_ user-facing change?
Yes, there are two entirely new metrics for the ESS, as documented in `monitoring.md`. Additionally in a YARN environment, `Timer` metrics exposed by the ESS will include more rich timing information.
### How was this patch tested?
New unit tests are added to verify that new metrics are showing up as expected.
We have been running this patch internally for approx. 1 year and have found it to be useful for monitoring the health of ESS and diagnosing performance issues.
Closes#32388 from xkrogen/xkrogen-SPARK-35258-ess-new-metrics.
Authored-by: Erik Krogen <xkrogen@apache.org>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
This adds support in the ESS to serve merged shuffle block meta and data requests to executors.
This change is needed for fetching remote merged shuffle data from the remote shuffle services. This is part of push-based shuffle SPIP [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
This change introduces new messages between clients and the external shuffle service:
1. `MergedBlockMetaRequest`: The client sends this to external shuffle to get the meta information for a merged block. The response to this is one of these :
- `MergedBlockMetaSuccess` : contains request id, number of chunks, and a `ManagedBuffer` which is a `FileSegmentBuffer` backed by the merged block meta file.
- `RpcFailure`: this is sent back to client in case of failure. This is an existing message.
2. `FetchShuffleBlockChunks`: This is similar to `FetchShuffleBlocks` message but it is to fetch merged shuffle chunks instead of blocks.
### Why are the changes needed?
These changes are needed for push-based shuffle. Refer to the SPIP in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
We have already verified the functionality and the improved performance as documented in the SPIP doc.
Lead-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com
Closes#32811 from otterc/SPARK-35671.
Lead-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
This PR aims to promote `LevelDBSuite.IntKeyType` class to a normal class to isolate `InMemoryIteratorSuite` from `LevelDBSuite`.
### Why are the changes needed?
We have the following test suite hierarchy.
```
DBIteratorSuite
- InMemoryIteratorSuite
- LevelDBIteratorSuite
```
`DBIteratorSuite.testRefWithIntNaturalKey` depends on `LevelDBSuite` and `InMemoryIteratorSuite` derived it. `InMemoryIteratorSuite` should not depend not `LevelDB`-specific stuff. This PR will make it sure.
```
public void testRefWithIntNaturalKey() throws Exception {
LevelDBSuite.IntKeyType i = new LevelDBSuite.IntKeyType();
...
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs.
```
$ build/sbt "kvstore/test"
```
Closes#32971 from dongjoon-hyun/SPARK-35824.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This change is for [SPARK-35757](https://issues.apache.org/jira/browse/SPARK-35757) and does the following:
1. adds bitwise AND operation to BitArray (similar to existing `putAll` method)
2. adds an intersect operation for combining bloom filters using bitwise AND operation (similar to existing `mergeInPlace` method).
### Why are the changes needed?
The current bloom filter library only allows combining two bloom filters using OR operation. It is useful to have AND operation as well.
### Does this PR introduce _any_ user-facing change?
No, just adds new methods.
### How was this patch tested?
Just the existing tests.
Closes#32907 from kudhru/master.
Lead-authored-by: kudhru <gargdhruv36@gmail.com>
Co-authored-by: Dhruv Kumar <kudhru@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle.
### Summary of changes:
Executor will create the merge directories under the application temp directory provided by YARN. The access control of the folder will be set to 770, where Shuffle Service can create merged shuffle files and write merge shuffle data in to those files.
Serve the merged shuffle blocks fetch request, read the merged shuffle blocks.
### Why are the changes needed?
Refer to the SPIP in SPARK-30602.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as documented in the SPIP doc.
Lead-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Ye Zhou yezhoulinkedin.com
Closes#32007 from zhouyejoe/SPARK-33350.
Lead-authored-by: Ye Zhou <yezhou@linkedin.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
Remove some useless code in the ExternalBlockHandler.
### Why are the changes needed?
There is some useless code in the ExternalBlockHandler, so we may remove it.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Exist unittests.
Closes#32571 from weixiuli/SPARK-35424.
Authored-by: weixiuli <weixiuli@jd.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR proposes a workaround to address the Netty OOM issue (SPARK-24989, SPARK-27991):
Basically, `ShuffleBlockFetcherIterator` would catch the `OutOfDirectMemoryError` from Netty and then set a global flag for the shuffle module. Any pending fetch requests would be deferred if there're in-flight requests until the flag is unset. And the flag will be unset when there's a fetch request succeed.
Note that catching the Netty OOM rather than abort the application is feasible because Netty manage its own memory region (offheap by default) separately. So Netty OOM doesn't mean the memory shortage of Spark.
### Why are the changes needed?
The Netty OOM issue is a very corner case. It usually happens in the large-scale cluster, where a reduce task could fetch shuffle blocks from hundreds of nodes concurrently in a short time. Internally, we found a cluster that has created 260+ clients within 6s before throwing Netty OOM.
Although Spark has configurations, e.g., `spark.reducer.maxReqsInFlight` to tune the number of concurrent requests, it's usually not a easy decision for the user to set a reasonable value regarding the workloads, machine resources, etc. But with this fix, Spark would heal the Netty memory issue itself without any specific configurations.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit tests.
Closes#32287 from Ngone51/SPARK-27991.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR replaces `toStringHelper`, an API which breaks in Guava 27.
### Why are the changes needed?
SPARK-30272 (#26911) removed usages which breaks in Guava 27 but `toStringHelper` is instroduced again.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Build successfully finished with the following command.
```
build/sbt -Dguava.version=27.0-jre -Phive -Phive-thriftserver -Pyarn -Pmesos -Pkubernetes -Phadoop-cloud -Pdocker-integration-tests -Pkubernetes-integration-tests -Pkinesis-asl -Pspark-ganglia-lgpl package
```
Closes#32567 from sarutak/remove-old-guava-usage.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
### What changes were proposed in this pull request?
There is a potential Netty memory leak in TransportResponseHandler.
### Why are the changes needed?
Fix a potential Netty memory leak in TransportResponseHandler.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
NO
Closes#31942 from weixiuli/SPARK-34834.
Authored-by: weixiuli <weixiuli@jd.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
This PR fixes two tests below:
https://github.com/apache/spark/runs/2320161984
```
[info] YarnShuffleIntegrationSuite:
[info] org.apache.spark.deploy.yarn.YarnShuffleIntegrationSuite *** ABORTED *** (228 milliseconds)
[info] org.apache.hadoop.yarn.exceptions.YarnRuntimeException: org.apache.hadoop.yarn.webapp.WebAppException: Error starting http server
[info] at org.apache.hadoop.yarn.server.MiniYARNCluster.startResourceManager(MiniYARNCluster.java:373)
[info] at org.apache.hadoop.yarn.server.MiniYARNCluster.access$300(MiniYARNCluster.java:128)
[info] at org.apache.hadoop.yarn.server.MiniYARNCluster$ResourceManagerWrapper.serviceStart(MiniYARNCluster.java:503)
[info] at org.apache.hadoop.service.AbstractService.start(AbstractService.java:194)
[info] at org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:121)
[info] at org.apache.hadoop.yarn.server.MiniYARNCluster.serviceStart(MiniYARNCluster.java:322)
[info] at org.apache.hadoop.service.AbstractService.start(AbstractService.java:194)
[info] at org.apache.spark.deploy.yarn.BaseYarnClusterSuite.beforeAll(BaseYarnClusterSuite.scala:95)
...
[info] Cause: java.net.BindException: Port in use: fv-az186-831:0
[info] at org.apache.hadoop.http.HttpServer2.constructBindException(HttpServer2.java:1231)
[info] at org.apache.hadoop.http.HttpServer2.bindForSinglePort(HttpServer2.java:1253)
[info] at org.apache.hadoop.http.HttpServer2.openListeners(HttpServer2.java:1316)
[info] at org.apache.hadoop.http.HttpServer2.start(HttpServer2.java:1167)
[info] at org.apache.hadoop.yarn.webapp.WebApps$Builder.start(WebApps.java:449)
[info] at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.startWepApp(ResourceManager.java:1247)
[info] at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.serviceStart(ResourceManager.java:1356)
[info] at org.apache.hadoop.service.AbstractService.start(AbstractService.java:194)
[info] at org.apache.hadoop.yarn.server.MiniYARNCluster.startResourceManager(MiniYARNCluster.java:365)
[info] at org.apache.hadoop.yarn.server.MiniYARNCluster.access$300(MiniYARNCluster.java:128)
[info] at org.apache.hadoop.yarn.server.MiniYARNCluster$ResourceManagerWrapper.serviceStart(MiniYARNCluster.java:503)
[info] at org.apache.hadoop.service.AbstractService.start(AbstractService.java:194)
[info] at org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:121)
[info] at org.apache.hadoop.yarn.server.MiniYARNCluster.serviceStart(MiniYARNCluster.java:322)
[info] at org.apache.hadoop.service.AbstractService.start(AbstractService.java:194)
[info] at org.apache.spark.deploy.yarn.BaseYarnClusterSuite.beforeAll(BaseYarnClusterSuite.scala:95)
[info] at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
[info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:61)
...
```
https://github.com/apache/spark/runs/2323342094
```
[info] Test org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.testBadSecret started
[error] Test org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.testBadSecret failed: java.lang.AssertionError: Connecting to /10.1.0.161:39895 timed out (120000 ms), took 120.081 sec
[error] at org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.testBadSecret(ExternalShuffleSecuritySuite.java:85)
[error] ...
[info] Test org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.testBadAppId started
[error] Test org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.testBadAppId failed: java.lang.AssertionError: Connecting to /10.1.0.198:44633 timed out (120000 ms), took 120.08 sec
[error] at org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.testBadAppId(ExternalShuffleSecuritySuite.java:76)
[error] ...
[info] Test org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.testValid started
[error] Test org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.testValid failed: java.io.IOException: Connecting to /10.1.0.119:43575 timed out (120000 ms), took 120.089 sec
[error] at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:285)
[error] at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
[error] at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
[error] at org.apache.spark.network.shuffle.ExternalBlockStoreClient.registerWithShuffleServer(ExternalBlockStoreClient.java:211)
[error] at org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.validate(ExternalShuffleSecuritySuite.java:108)
[error] at org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.testValid(ExternalShuffleSecuritySuite.java:68)
[error] ...
[info] Test org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.testEncryption started
[error] Test org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.testEncryption failed: java.io.IOException: Connecting to /10.1.0.248:35271 timed out (120000 ms), took 120.014 sec
[error] at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:285)
[error] at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
[error] at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
[error] at org.apache.spark.network.shuffle.ExternalBlockStoreClient.registerWithShuffleServer(ExternalBlockStoreClient.java:211)
[error] at org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.validate(ExternalShuffleSecuritySuite.java:108)
[error] at org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.testEncryption(ExternalShu
```
For Yarn cluster suites, its difficult to fix. This PR makes it skipped if it fails to bind.
For shuffle related suites, it uses local host
### Why are the changes needed?
To make the tests stable
### Does this PR introduce _any_ user-facing change?
No, dev-only.
### How was this patch tested?
Its tested in GitHub Actions: https://github.com/HyukjinKwon/spark/runs/2340210765Closes#32126 from HyukjinKwon/SPARK-35002-followup.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request?
Add overflow check before do `new byte[]`.
### Why are the changes needed?
Avoid overflow in extreme case.
### Does this PR introduce _any_ user-facing change?
Maybe yes, the error msg changed if overflow.
### How was this patch tested?
Pass CI.
Closes#32142 from ulysses-you/SPARK-35041.
Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
Add check if the byte length over `int`.
### Why are the changes needed?
We encounter a very extreme case with expression `concat_ws`, and the error msg is
```
Caused by: java.lang.NegativeArraySizeException
at org.apache.spark.unsafe.types.UTF8String.concatWs
```
Seems the `UTF8String.concat` has already done the length check at [#21064](https://github.com/apache/spark/pull/21064), so it's better to add in `concatWs`.
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
It's too heavy to add the test.
Closes#32106 from ulysses-you/SPARK-35005.
Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
Add a new config, `spark.shuffle.service.name`, which allows for Spark applications to look for a YARN shuffle service which is defined at a name other than the default `spark_shuffle`.
Add a new config, `spark.yarn.shuffle.service.metrics.namespace`, which allows for configuring the namespace used when emitting metrics from the shuffle service into the NodeManager's `metrics2` system.
Add a new mechanism by which to override shuffle service configurations independently of the configurations in the NodeManager. When a resource `spark-shuffle-site.xml` is present on the classpath of the shuffle service, the configs present within it will be used to override the configs coming from `yarn-site.xml` (via the NodeManager).
### Why are the changes needed?
There are two use cases which can benefit from these changes.
One use case is to run multiple instances of the shuffle service side-by-side in the same NodeManager. This can be helpful, for example, when running a YARN cluster with a mixed workload of applications running multiple Spark versions, since a given version of the shuffle service is not always compatible with other versions of Spark (e.g. see SPARK-27780). With this PR, it is possible to run two shuffle services like `spark_shuffle` and `spark_shuffle_3.2.0`, one of which is "legacy" and one of which is for new applications. This is possible because YARN versions since 2.9.0 support the ability to run shuffle services within an isolated classloader (see YARN-4577), meaning multiple Spark versions can coexist.
Besides this, the separation of shuffle service configs into `spark-shuffle-site.xml` can be useful for administrators who want to change and/or deploy Spark shuffle service configurations independently of the configurations for the NodeManager (e.g., perhaps they are owned by two different teams).
### Does this PR introduce _any_ user-facing change?
Yes. There are two new configurations related to the external shuffle service, and a new mechanism which can optionally be used to configure the shuffle service. `docs/running-on-yarn.md` has been updated to provide user instructions; please see this guide for more details.
### How was this patch tested?
In addition to the new unit tests added, I have deployed this to a live YARN cluster and successfully deployed two Spark shuffle services simultaneously, one running a modified version of Spark 2.3.0 (which supports some of the newer shuffle protocols) and one running Spark 3.1.1. Spark applications of both versions are able to communicate with their respective shuffle services without issue.
Closes#31936 from xkrogen/xkrogen-SPARK-34828-shufflecompat-config-from-classpath.
Authored-by: Erik Krogen <xkrogen@apache.org>
Signed-off-by: Thomas Graves <tgraves@apache.org>
### What changes were proposed in this pull request?
Currently, when a connection for TransportClient is marked as idled and closed, we suggest users adjust `spark.network.timeout` for all transport modules. As a lot of timeout configs will fallback to the `spark.network.timeout`, this could be a piece of overkill advice, we should give a more targeted one with `spark.${moduleName}.io.connectionTimeout`
### Why are the changes needed?
better advise for overloaded network traffic cases
### Does this PR introduce _any_ user-facing change?
yes, when a connection is zombied and closed by spark internally, users can use a more targeted config to tune their jobs
### How was this patch tested?
Just log and doc. Passing Jenkins and GA
Closes#31990 from yaooqinn/SPARK-34894.
Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
### What changes were proposed in this pull request?
This PR fixes bugs that causes corruption of push-merged blocks when a client terminates while pushing block. `RemoteBlockPushResolver` was introduced in #30062 (SPARK-32916).
There are 2 scenarios where the merged blocks get corrupted:
1. `StreamCallback.onFailure()` is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order.
- `exceptionCaught`. This event is propagated to `StreamInterceptor`. `StreamInterceptor.exceptionCaught()` invokes `callback.onFailure(streamId, cause)`. This is the first time StreamCallback.onFailure() will be invoked.
- `channelInactive`. Since the channel closes, the `channelInactive` event gets triggered which again is propagated to `StreamInterceptor`. `StreamInterceptor.channelInactive()` invokes `callback.onFailure(streamId, new ClosedChannelException())`. This is the second time StreamCallback.onFailure() will be invoked.
2. The flag `isWriting` is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails.
Also adding additional changes that improve the code.
1. Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact.
2. Additional minor changes suggested by mridulm during an internal review.
### Why are the changes needed?
These are bug fixes and simplify the code.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit tests. I have also tested these changes in Linkedin's internal fork on a cluster.
Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com
Closes#31934 from otterc/SPARK-32916-followup.
Lead-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
The main change of this pr as follows:
- Use `org.junit.Assert.assertThrows(String, Class, ThrowingRunnable)` method instead of `ExpectedException.none()`
- Use `org.hamcrest.MatcherAssert.assertThat()` method instead of `org.junit.Assert.assertThat(T, org.hamcrest.Matcher<? super T>)`
### Why are the changes needed?
Clean up deprecated API usage
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the Jenkins or GitHub Action
Closes#31815 from LuciferYang/SPARK-34722.
Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>