### What changes were proposed in this pull request?
This is the followup from https://github.com/apache/spark/pull/33012#discussion_r659440833, where we want to add `Unstable` to `FileCommitProtocol`, to give people a better idea of API.
### Why are the changes needed?
Make it easier for people to follow and understand code. Clean up code.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing unit tests, as no real logic change.
Closes#33148 from c21/bucket-followup.
Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This is the shuffle fetch side change where executors can fetch local/remote push-merged shuffle data from shuffle services. This is needed for push-based shuffle - SPIP [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
The change adds support to the `ShuffleBlockFetchIterator` to fetch push-merged block meta and shuffle chunks from local and remote ESS. If the fetch of any of these fails, then the iterator fallsback to fetch the original shuffle blocks that belonged to the push-merged block.
### Why are the changes needed?
These changes are needed for push-based shuffle. Refer to the SPIP in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
### Does this PR introduce _any_ user-facing change?
When push-based shuffle is turned on then that will fetch push-merged blocks from the remote shuffle service. The client logs will indicate this.
### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
We have already verified the functionality and the improved performance as documented in the SPIP doc.
Lead-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Ye Zhou yezhoulinkedin.com
Closes#32140 from otterc/SPARK-32922.
Lead-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: otterc <singh.chandni@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
This PR aims to upgrade ASM to 9.1
### Why are the changes needed?
The latest `xbean-asm9-shaded` is built with ASM 9.1.
- https://mvnrepository.com/artifact/org.apache.xbean/xbean-asm9-shaded/4.20
- 5e0e3c0c64/pom.xml (L67)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs.
Closes#33130 from dongjoon-hyun/SPARK-35928.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Currently, we calculate the `remoteBlockBytes` based on the original block info list. It's not efficient. Usually, it costs more ~25% time to be spent here.
If the original reducer size is big but the actual reducer size is small due to automatically partition coalescing of AQE, the reducer will take more time to calculate `remoteBlockBytes`.
We can reduce this cost via remote requests which contain merged block info lists.
### Why are the changes needed?
improve task performance
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
new unit tests and verified manually.
Closes#33109 from yaooqinn/SPARK-35910.
Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This adds two new additional metrics to `ExternalBlockHandler`:
- `blockTransferRate` -- for indicating the rate of transferring blocks, vs. the data within them
- `blockTransferAvgSize_1min` -- a 1-minute trailing average of block sizes transferred by the ESS
Additionally, this enhances `YarnShuffleServiceMetrics` to expose the histogram/`Snapshot` information from `Timer` metrics within `ExternalBlockHandler`.
### Why are the changes needed?
Currently `ExternalBlockHandler` exposes some useful metrics, but is lacking around metrics for the rate of block transfers. We have `blockTransferRateBytes` to tell us the rate of _bytes_, but no metric to tell us the rate of _blocks_, which is especially relevant when running the ESS on HDDs that are sensitive to random reads. Many small block transfers can have a negative impact on performance, but won't show up as a spike in `blockTransferRateBytes` since the sizes are small. Thus the new metrics to show information around average block size and block transfer rate are very useful to monitor the health/performance of the ESS, especially when running on HDDs.
For the `YarnShuffleServiceMetrics`, currently the three `Timer` metrics exposed by `ExternalBlockHandler` are being underutilized in a YARN-based environment -- they are basically treated as a `Meter`, only exposing rate-based information, when the metrics themselves are collected detailed histograms of timing information. We should expose this information for better observability.
### Does this PR introduce _any_ user-facing change?
Yes, there are two entirely new metrics for the ESS, as documented in `monitoring.md`. Additionally in a YARN environment, `Timer` metrics exposed by the ESS will include more rich timing information.
### How was this patch tested?
New unit tests are added to verify that new metrics are showing up as expected.
We have been running this patch internally for approx. 1 year and have found it to be useful for monitoring the health of ESS and diagnosing performance issues.
Closes#32388 from xkrogen/xkrogen-SPARK-35258-ess-new-metrics.
Authored-by: Erik Krogen <xkrogen@apache.org>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
This PR fixes perf regression at the executor side when creating fetch requests with large initial partitions
![image](https://user-images.githubusercontent.com/8326978/123270865-dd21e800-d532-11eb-8447-ad80e47b034f.png)
In NetEase, we had an online job that took `45min` to "fetch" about 100MB of shuffle data, which actually turned out that it was just collecting fetch requests slowly. Normally, such a task should finish in seconds.
See the `DEBUG` log
```
21/06/22 11:52:26 DEBUG BlockManagerStorageEndpoint: Sent response: 0 to kyuubi.163.org:
21/06/22 11:53:05 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 3941440 at BlockManagerId(12, .., 43559, None) with 19 blocks
21/06/22 11:53:44 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 3716400 at BlockManagerId(20, .., 38287, None) with 18 blocks
21/06/22 11:54:41 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 4559280 at BlockManagerId(6, .., 39689, None) with 22 blocks
21/06/22 11:55:08 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 3120160 at BlockManagerId(33, .., 39449, None) with 15 blocks
```
I also create a test case locally with my local laptop docker env to give some reproducible cases.
```
bin/spark-sql --conf spark.kubernetes.file.upload.path=./ --master k8s://https://kubernetes.docker.internal:6443 --conf spark.kubernetes.container.image=yaooqinn/spark:v20210624-5 -c spark.kubernetes.context=docker-for-desktop_1 --num-executors 5 --driver-memory 5g --conf spark.kubernetes.executor.podNamePrefix=sparksql
```
```sql
SET spark.sql.adaptive.enabled=true;
SET spark.sql.shuffle.partitions=3000;
SELECT /*+ REPARTITION */ 1 as pid, id from range(1, 1000000, 1, 500);
SELECT /*+ REPARTITION(pid, id) */ 1 as pid, id from range(1, 1000000, 1, 500);
```
### Why are the changes needed?
fix perf regression which was introduced by SPARK-29292 (3ad4863673) in v3.1.0.
3ad4863673 is for support compilation with scala 2.13 but the performance losses is huge. We need to consider backporting this PR to branch 3.1.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Mannully,
#### before
```log
21/06/23 13:54:22 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRemoteRequestSize: 10066329, maxBlocksInFlightPerAddress: 2147483647
21/06/23 13:54:38 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2314708 at BlockManagerId(2, 10.1.3.114, 36423, None) with 86 blocks
21/06/23 13:54:59 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2636612 at BlockManagerId(3, 10.1.3.115, 34293, None) with 87 blocks
21/06/23 13:55:18 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2508706 at BlockManagerId(4, 10.1.3.116, 41869, None) with 90 blocks
21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2350854 at BlockManagerId(5, 10.1.3.117, 45787, None) with 85 blocks
21/06/23 13:55:34 INFO ShuffleBlockFetcherIterator: Getting 438 (11.8 MiB) non-empty blocks including 90 (2.5 MiB) local and 0 (0.0 B) host-local and 348 (9.4 MiB) remote blocks
21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Sending request for 87 blocks (2.5 MiB) from 10.1.3.115:34293
21/06/23 13:55:34 INFO TransportClientFactory: Successfully created connection to /10.1.3.115:34293 after 1 ms (0 ms spent in bootstraps)
21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Sending request for 90 blocks (2.4 MiB) from 10.1.3.116:41869
21/06/23 13:55:34 INFO TransportClientFactory: Successfully created connection to /10.1.3.116:41869 after 2 ms (0 ms spent in bootstraps)
21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Sending request for 85 blocks (2.2 MiB) from 10.1.3.117:45787
```
```log
21/06/23 14:00:45 INFO MapOutputTracker: Broadcast outputstatuses size = 411, actual size = 828997
21/06/23 14:00:45 INFO MapOutputTrackerWorker: Got the map output locations
21/06/23 14:00:45 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRemoteRequestSize: 10066329, maxBlocksInFlightPerAddress: 2147483647
21/06/23 14:00:55 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1894389 at BlockManagerId(2, 10.1.3.114, 36423, None) with 99 blocks
21/06/23 14:01:04 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1919993 at BlockManagerId(3, 10.1.3.115, 34293, None) with 100 blocks
21/06/23 14:01:14 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1977186 at BlockManagerId(5, 10.1.3.117, 45787, None) with 103 blocks
21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1938336 at BlockManagerId(4, 10.1.3.116, 41869, None) with 101 blocks
21/06/23 14:01:23 INFO ShuffleBlockFetcherIterator: Getting 500 (9.1 MiB) non-empty blocks including 97 (1820.3 KiB) local and 0 (0.0 B) host-local and 403 (7.4 MiB) remote blocks
21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 101 blocks (1892.9 KiB) from 10.1.3.116:41869
21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 103 blocks (1930.8 KiB) from 10.1.3.117:45787
21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 99 blocks (1850.0 KiB) from 10.1.3.114:36423
21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 100 blocks (1875.0 KiB) from 10.1.3.115:34293
21/06/23 14:01:23 INFO ShuffleBlockFetcherIterator: Started 4 remote fetches in 37889 ms
```
#### After
```log
21/06/24 13:01:16 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRemoteRequestSize: 10066329, maxBlocksInFlightPerAddress: 2147483647
21/06/24 13:01:16 INFO ShuffleBlockFetcherIterator: ==> Call blockInfos.map(_._2).sum: 40 ms
21/06/24 13:01:16 INFO ShuffleBlockFetcherIterator: ==> Call mergeFetchBlockInfo for shuffle_0_9_2990_2997/9: 0 ms
21/06/24 13:01:16 INFO ShuffleBlockFetcherIterator: ==> Call mergeFetchBlockInfo for shuffle_0_15_2395_2997/15: 0 ms
```
Closes#33063 from yaooqinn/SPARK-35879.
Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
### What changes were proposed in this pull request?
Fix the consistent break on Scala 2.13 build caused by the PR https://github.com/apache/spark/pull/32767
### Why are the changes needed?
Fix the consistent break.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#33084 from xuanyuanking/SPARK-35628-follow.
Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Refactor the logic for constructing the user classpath from `yarn.ApplicationMaster` into `yarn.Client` so that it can be leveraged on the executor side as well, instead of having the driver construct it and pass it to the executor via command-line arguments. A new method, `getUserClassPath`, is added to `CoarseGrainedExecutorBackend` which defaults to `Nil` (consistent with the existing behavior where non-YARN resource managers do not configure the user classpath). `YarnCoarseGrainedExecutorBackend` overrides this to construct the user classpath from the existing `APP_JAR` and `SECONDARY_JARS` configs.
### Why are the changes needed?
User-provided JARs are made available to executors using a custom classloader, so they do not appear on the standard Java classpath. Instead, they are passed as a list to the executor which then creates a classloader out of the URLs. Currently in the case of YARN, this list of JARs is crafted by the Driver (in `ExecutorRunnable`), which then passes the information to the executors (`CoarseGrainedExecutorBackend`) by specifying each JAR on the executor command line as `--user-class-path /path/to/myjar.jar`. This can cause extremely long argument lists when there are many JARs, which can cause the OS argument length to be exceeded, typically manifesting as the error message:
> /bin/bash: Argument list too long
A [Google search](https://www.google.com/search?q=spark%20%22%2Fbin%2Fbash%3A%20argument%20list%20too%20long%22&oq=spark%20%22%2Fbin%2Fbash%3A%20argument%20list%20too%20long%22) indicates that this is not a theoretical problem and afflicts real users, including ours. Passing this list using the configurations instead resolves this issue.
### Does this PR introduce _any_ user-facing change?
No, except for fixing the bug, allowing for larger JAR lists to be passed successfully. Configuration of JARs is identical to before.
### How was this patch tested?
New unit tests were added in `YarnClusterSuite`. Also, we have been running a similar fix internally for 4 months with great success.
Closes#32810 from xkrogen/xkrogen-SPARK-35672-classpath-scalable.
Authored-by: Erik Krogen <xkrogen@apache.org>
Signed-off-by: Thomas Graves <tgraves@apache.org>
### What changes were proposed in this pull request?
This patches the hadoop configuration so that fs.s3a.endpoint is set to
s3.amazonaws.com if neither it nor fs.s3a.endpoint.region is set.
This stops S3A Filesystem creation failing with the error
"Unable to find a region via the region provider chain."
in some non-EC2 deployments.
See: HADOOP-17771.
when spark options are propagated to the hadoop configuration
in SparkHadoopUtils. the fs.s3a.endpoint value is set to
"s3.amazonaws.com" if unset and no explicit region
is set in fs.s3a.endpoint.region.
### Why are the changes needed?
A regression in Hadoop 3.3.1 has surfaced which causes S3A filesystem
instantiation to fail outside EC2 deployments if the host lacks
a CLI configuration in ~/.aws/config declaring the region, or
the `AWS_REGION` environment variable
HADOOP-17771 fixes this in Hadoop-3.3.2+, but
this spark patch will correct the behavior when running
Spark with the 3.3.1 artifacts.
It is harmless for older versions and compatible
with hadoop releases containing the HADOOP-17771
fix.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New tests to verify propagation logic from spark conf to hadoop conf.
Closes#33064 from steveloughran/SPARK-35878-regions.
Authored-by: Steve Loughran <stevel@cloudera.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
The implementation for the load operation of RocksDBFileManager.
### Why are the changes needed?
Provide the functionality of loading all necessary files for specific checkpoint versions from DFS to the given local directory.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New UT added.
Closes#32767 from xuanyuanking/SPARK-35628.
Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
This PR is to introduce a new sets of APIs `newTaskTempFile` and `newTaskTempFileAbsPath` inside `FileCommitProtocol`, to allow more flexible file naming of Spark output. The major change is to pass `FileNameSpec` into `FileCommitProtocol`, instead of original `ext` (currently having `prefix` and `ext`), to allow individual `FileCommitProtocol` implementation comes up with more flexible file names (e.g. has a custom `prefix`) for Hive/Presto bucketing - https://github.com/apache/spark/pull/30003. Provide a default implementations of the added APIs, so all existing implementation of `FileCommitProtocol` is NOT being broken.
### Why are the changes needed?
To make commit protocol more flexible in terms of Spark output file name.
Pre-requisite of https://github.com/apache/spark/pull/30003.
### Does this PR introduce _any_ user-facing change?
Yes for developers who implement/run custom implementation of `FileCommitProtocol`. They can choose to implement for the newly added API.
### How was this patch tested?
Existing unit tests as this is just adding an API.
Closes#33012 from c21/commit-protocol-api.
Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
In a secure Yarn cluster, even though HBase or Kafka, or Hive services are not used in the user application, yarn client unnecessarily trying to generate Delegations token from these services. This will add additional delays while submitting spark application in a yarn cluster
Also during HBase delegation token generation step in the application submit stage, HBaseDelegationTokenProvider prints a full Exception Stack trace and it causes a noisy warning.
Apart from printing exception stack trace, Application submission taking more time as it retries connection to HBase master multiple times before it gives up. So, if HBase is not used in the user Applications, it is better to suggest User disable HBase Delegation Token generation.
This PR aims to avoid printing full Exception Stack by just printing just Exception name and also add a suggestion message to disable `Delegation Token generation` if service is not used in the Spark Application.
eg: `If HBase is not used, set spark.security.credentials.hbase.enabled to false`
### Why are the changes needed?
To avoid printing full Exception stack trace in WARN log
#### Before the fix
----------------
```
spark-shell --master yarn
.......
.......
21/06/12 14:29:41 WARN security.HBaseDelegationTokenProvider: Failed to get token from service hbase
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.security.HBaseDelegationTokenProvider.obtainDelegationTokensWithHBaseConn(HBaseDelegationT
okenProvider.scala:93)
at org.apache.spark.deploy.security.HBaseDelegationTokenProvider.obtainDelegationTokens(HBaseDelegationTokenProvider.
scala:60)
at org.apache.spark.deploy.security.HadoopDelegationTokenManager$$anonfun$6.apply(HadoopDelegationTokenManager.scala:
166)
at org.apache.spark.deploy.security.HadoopDelegationTokenManager$$anonfun$6.apply(HadoopDelegationTokenManager.scala:
164)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.deploy.security.HadoopDelegationTokenManager.obtainDelegationTokens(HadoopDelegationTokenManager.
scala:164)
```
#### After the fix
------------
```
spark-shell --master yarn
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/06/13 02:10:02 WARN security.HBaseDelegationTokenProvider: Failed to get token from service hbase due to java.lang.reflect.InvocationTargetException Retrying to fetch HBase security token with hbase connection parameter.
21/06/13 02:10:40 WARN security.HBaseDelegationTokenProvider: Failed to get token from service hbase java.lang.reflect.InvocationTargetException. If HBase is not used, set spark.security.credentials.hbase.enabled to false
21/06/13 02:10:47 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
```
### Does this PR introduce _any_ user-facing change?
Yes, in the log, it avoids printing full Exception stack trace.
Instread prints this.
**WARN security.HBaseDelegationTokenProvider: Failed to get token from service hbase java.lang.reflect.InvocationTargetException. If HBase is not used, set spark.security.credentials.hbase.enabled to false**
### How was this patch tested?
Tested manually as it can be verified only in a secure cluster
Closes#32894 from vinodkc/br_fix_Hbase_DT_Exception_stack_printing.
Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR aims to add `fs.s3a.downgrade.syncable.exceptions=true` if it's not provided by the users.
### Why are the changes needed?
Currently, event log feature is broken with Hadoop 3.2 profile due to `UnsupportedOperationException` because [HADOOP-17597](https://issues.apache.org/jira/browse/HADOOP-17597) changes the default behavior to throw exceptions by default since Apache Hadoop 3.3.1. We know that it's because `EventLogFileWriters` is using `hadoopDataStream.foreach(_.hflush())`, but this PR aims to provide the same UX across Spark distributions with Hadoop2/Hadoop 3 at Apache Spark 3.2.0.
```
$ bin/spark-shell -c spark.eventLog.enabled=true -c spark.eventLog.dir=s3a://dongjoon/spark-events/
...
21/06/23 17:34:35 ERROR SparkContext: Error initializing SparkContext.
java.lang.UnsupportedOperationException: S3A streams are not Syncable. See HADOOP-17597.
```
### Does this PR introduce _any_ user-facing change?
Yes, this will recover the existing behavior.
### How was this patch tested?
Manual.
```
$ build/sbt package -Phadoop-3.2 -Phadoop-cloud
$ bin/spark-shell -c spark.eventLog.enabled=true -c spark.eventLog.dir=s3a://dongjoon/spark-events/
...(working)...
```
If the users provide the configuration explicitly, it will return to the original behavior throwing exceptions.
```
$ bin/spark-shell -c spark.eventLog.enabled=true -c spark.eventLog.dir=s3a://dongjoon/spark-events/ -c spark.hadoop.fs.s3a.downgrade.syncable.exceptions=false
...
21/06/23 17:44:41 ERROR Main: Failed to initialize Spark session.
java.lang.UnsupportedOperationException: S3A streams are not Syncable. See HADOOP-17597.
```
Closes#33044 from dongjoon-hyun/SPARK-35868.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Wrapping `JHashMap[BlockId, BlockStatus]` (used in `blockStatusByShuffleService`) into a new class `BlockStatusPerBlockId` which removes the reference to the map when all the persisted blocks are removed.
### Why are the changes needed?
With https://github.com/apache/spark/pull/32790 a bug is introduced when all the persisted blocks are removed we remove the HashMap which already shared by the block manger infos but when new block is persisted this map is needed to be used again for storing the data (and this HashMap must be the same which shared by the block manger infos created for registered block managers running on the same host where the external shuffle service is).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Extending `BlockManagerInfoSuite` with test which removes all the persisted blocks then adds another one.
Closes#33020 from attilapiros/SPARK-35543-2.
Authored-by: attilapiros <piros.attila.zsolt@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
This's a follow-up of https://github.com/apache/spark/pull/30710.
Rename the conf from `spark.speculation.min.threshold` to `spark.speculation.minTaskRuntime`.
### Why are the changes needed?
To follow the [config naming policy](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala#L21).
### Does this PR introduce _any_ user-facing change?
No (since Spark 3.2 hasn't been released).
### How was this patch tested?
Pass existing tests.
Closes#33037 from Ngone51/spark-33741-followup.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
It is a trivial change to remove the reference to an incorrect configuration for push-based shuffle from a test suite.
Ref: https://github.com/apache/spark/pull/30312
With SPARK-32917, `ShuffleBlockPusher` and its test suite was introduced. `ShuffleBlockPusher` is created only when push-based shuffle is enabled and the tests in `ShuffleBlockPusherSuite` are just testing the functionality in the pusher. So there is no need to have these configs enabled in these test.
### Why are the changes needed?
This change removes an incorrect configuration from the test suite.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
This change just removes an incorrect configuration from the test suite so haven't added any UTs for it.
Closes#32992 from otterc/SPARK-35836.
Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
This PR fixes a memory leak in ExecutorAllocationListener.
### Why are the changes needed?
Dynamic allocation stops working under high load (~100 tasks/s, ~5 stages/s) in long-lived (~10 days) spark applications. This PR addresses the problem.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual tests. The patch fixed dynamic allocation in production cluster.
Closes#32526 from VasilyKolpakov/SPARK-35391_fix_ExecutorAllocationListener.
Authored-by: Vasily Kolpakov <vasilykolpakov@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
### What changes were proposed in this pull request?
This adds support in the ESS to serve merged shuffle block meta and data requests to executors.
This change is needed for fetching remote merged shuffle data from the remote shuffle services. This is part of push-based shuffle SPIP [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
This change introduces new messages between clients and the external shuffle service:
1. `MergedBlockMetaRequest`: The client sends this to external shuffle to get the meta information for a merged block. The response to this is one of these :
- `MergedBlockMetaSuccess` : contains request id, number of chunks, and a `ManagedBuffer` which is a `FileSegmentBuffer` backed by the merged block meta file.
- `RpcFailure`: this is sent back to client in case of failure. This is an existing message.
2. `FetchShuffleBlockChunks`: This is similar to `FetchShuffleBlocks` message but it is to fetch merged shuffle chunks instead of blocks.
### Why are the changes needed?
These changes are needed for push-based shuffle. Refer to the SPIP in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
We have already verified the functionality and the improved performance as documented in the SPIP doc.
Lead-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com
Closes#32811 from otterc/SPARK-35671.
Lead-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
To make the test suite more robust, this PR aims to add a new trait, `LocalRootDirsTest`, by refactoring `SortShuffleSuite`'s helper functions and applying it to the following:
- ShuffleNettySuite
- ShuffleOldFetchProtocolSuite
- ExternalShuffleServiceSuite
- KubernetesLocalDiskShuffleDataIOSuite
- LocalDirsSuite
- RDDCleanerSuite
- ALSCleanerSuite
In addition, this fixes a UT in `KubernetesLocalDiskShuffleDataIOSuite`.
### Why are the changes needed?
`ShuffleSuite` is extended by four classes but only `SortShuffleSuite` does the clean-up correctly.
```
ShuffleSuite
- SortShuffleSuite
- ShuffleNettySuite
- ShuffleOldFetchProtocolSuite
- ExternalShuffleServiceSuite
```
Since `KubernetesLocalDiskShuffleDataIOSuite` is looking for the other storage directory, the leftover of `ShuffleSuite` causes flakiness.
- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-3.2/2649/testReport/junit/org.apache.spark.shuffle/KubernetesLocalDiskShuffleDataIOSuite/recompute_is_not_blocked_by_the_recovery/
```
org.apache.spark.SparkException: Job aborted due to stage failure: task 0.0 in stage 1.0 (TID 3) had a not serializable result: org.apache.spark.ShuffleSuite$NonJavaSerializableClass
...
org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIOSuite.$anonfun$new$2(KubernetesLocalDiskShuffleDataIOSuite.scala:52)
```
For the other suites, the clean-up implementation is used but not complete. So, they are refactored to use new trait.
### Does this PR introduce _any_ user-facing change?
No, this is a test-only change.
### How was this patch tested?
Pass the CIs.
Closes#32986 from dongjoon-hyun/SPARK-35832.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Change primaryResource assertion from exact match to suffix match in case SparkSubmitSuite.`handles k8s cluster mode`
### Why are the changes needed?
When I run SparkSubmitSuite on MacOs 10.15.7, I got AssertionError for `handles k8s cluster mode` test after pr [SPARK-35691](https://issues.apache.org/jira/browse/SPARK-35691), due to `File(path).getCanonicalFile().toURI()` function with absolute path as parameter will return path begin with `/System/Volumes/Data` on MacOs higher tha 10.15.
eg. `/home/testjars.jar` will get `file:/System/Volumes/Data/home/testjars.jar`
In order to pass UT on MacOs higher than 10.15, we change the assertion into suffix match
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
1. Pass the GitHub Action
2. Manually test
- environment: MacOs > 10.15
- commad: `build/mvn clean install -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -pl core -am -DwildcardSuites=org.apache.spark.deploy.SparkSubmitSuite -Dtest=none`
- Test result:
- before this pr, case failed with following exception:
`- handles k8s cluster mode *** FAILED ***
Some("file:/System/Volumes/Data/home/thejar.jar") was not equal to Some("file:/home/thejar.jar") (SparkSubmitSuite.scala:485)
Analysis:
Some(value: "file:/[System/Volumes/Data/]home/thejar.jar" -> "file:/[]home/thejar.jar")`
- after this pr, run all test successfully
Closes#32948 from toujours33/SPARK-35796.
Authored-by: toujours33 <wangyazhi@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
PySpark added pinned thread mode at https://github.com/apache/spark/pull/24898 to sync Python thread to JVM thread. Previously, one JVM thread could be reused which ends up with messed inheritance hierarchy such as thread local especially when multiple jobs run in parallel. To completely fix this, we should enable this mode by default.
### Why are the changes needed?
To correctly support parallel job submission and management.
### Does this PR introduce _any_ user-facing change?
Yes, now Python thread is mapped to JVM thread one to one.
### How was this patch tested?
Existing tests should cover it.
Closes#32429 from HyukjinKwon/SPARK-35303.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR aims to upgrade `zstd-jni` to 1.5.0-2, which uses `zstd` version 1.5.0.
### Why are the changes needed?
Major improvements to Zstd support are targeted for the upcoming 3.2.0 release of Spark. Zstd 1.5.0 introduces significant compression (+25% to 140%) and decompression (~15%) speed improvements in benchmarks described in more detail on the releases page:
- https://github.com/facebook/zstd/releases/tag/v1.5.0
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Build passes build tests, but the benchmark tests seem flaky. I am unsure if this change is responsible. The error is:
```
Running org.apache.spark.rdd.CoalescedRDDBenchmark:
21/06/08 18:53:10 ERROR SparkContext: Failed to add file:/home/runner/work/spark/spark/./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar to Spark environment
java.lang.IllegalArgumentException: requirement failed: File spark-core_2.12-3.2.0-SNAPSHOT-tests.jar was already registered with a different path (old path = /home/runner/work/spark/spark/core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar, new path = /home/runner/work/spark/spark/./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar
```
https://github.com/dchristle/spark/runs/2776123749?check_suite_focus=true
cc: dongjoon-hyun
Closes#32826 from dchristle/ZSTD150.
Lead-authored-by: David Christle <dchristle@squareup.com>
Co-authored-by: David Christle <dchristle@users.noreply.github.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
In current EventLoggingListener, we won't write SparkListenerExecutorMetricsUpdate message to event log file at all
```
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
if (shouldLogStageExecutorMetrics) {
event.executorUpdates.foreach { case (stageKey1, newPeaks) =>
liveStageExecutorMetrics.foreach { case (stageKey2, metricsPerExecutor) =>
// If the update came from the driver, stageKey1 will be the dummy key (-1, -1),
// so record those peaks for all active stages.
// Otherwise, record the peaks for the matching stage.
if (stageKey1 == DRIVER_STAGE_KEY || stageKey1 == stageKey2) {
val metrics = metricsPerExecutor.getOrElseUpdate(
event.execId, new ExecutorMetrics())
metrics.compareAndUpdatePeakValues(newPeaks)
}
}
}
}
}
```
In history server's restful API about executor, we can get Executor's metrics but can't get all driver's metrics. Executor's executor metrics can be updated with TaskEnd event etc...
So in this pr, I add support to log SparkListenerExecutorMetricsUpdateEvent of `driver` when `spark.eventLog.logStageExecutorMetrics` is true.
### Why are the changes needed?
Make user can got driver's peakMemoryMetrics in SHS.
### Does this PR introduce _any_ user-facing change?
user can got driver's executor metrics in SHS's restful API.
### How was this patch tested?
Mannul test
Closes#31992 from AngersZhuuuu/SPARK-34898.
Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
This PR cleans up the code of `BlockManagerDecommissioner`. It includes a few changes:
* Only create `BlockManagerDecommissioner` instance when shuffle or RDD blocks requires migration:
there's no need to create `BlockManagerDecommissioner` instance if only `STORAGE_DECOMMISSION_ENABLED=true` and to check blocks migration in `shutdownThread`.
* Shut down the migration thread more gracefully:
1. we'd better not log errors if the `BlockManagerDecommissioner.stop()` is invoked explicitly. But currently, users will see
<details>
<summary>error message</summary>
```
21/01/04 20:11:52 ERROR BlockManagerDecommissioner: Error while waiting for block to migrate
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:83)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run$1(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:68)
at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:54)
at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:101)
at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
</details>
2. don't shut down a thread pool like below since `shutdown()` actually doesn't block to wait for running tasks finish:
```scala
executor.shutdown()
executor.shutdownNow()
```
* Avoid initiating `shuffleMigrationPool` when it's unnecessary:
Currently, it's always initiated even if shuffle block migration is disabled. (`BlockManagerDecommissioner.stop()` -> `stopOffloadingShuffleBlocks()` -> initiate `shuffleMigrationPool`)
* Unify the terminologies between `offload` and `migrate`:
replace `offload` with `migrate`
* Do not add back the shuffle blocks when it exceeds the max failure number:
this avoids unnecessary operations
* Do not try `decommissionRddCacheBlocks()` if we already know there are no available peers
* Clean up logs:
Currently, we have many different description for the same thing, which is not good for the user experience
* Other cleanups
### Why are the changes needed?
code clean up
### Does this PR introduce _any_ user-facing change?
Yes, users will not see misleading logs, e.g., the interrupted error.
### How was this patch tested?
Update a unite test since we change the behavior of creating the `BlockManagerDecommissioner` instance.
Other changes are only code cleanup so they won't cause behaviour change. So passing the existing tests should be enough.
Closes#31102 from Ngone51/stop-decommission-gracefully.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Cache commonly occurring duplicate Some objects in SQLMetrics by using a Guava cache and reusing the existing Guava String Interner to avoid duplicate strings in JSONProtocol. Also with AccumulatorV2 we have seen lot of Some(-1L) and Some(0L) occurrences in a heap dump that is naively interned by having reusing a already constructed Some(-1L) and Some(0L)
To give some context on the impact and the garbage got accumulated, below are the details of the complex spark job which we troubleshooted and figured out the bottlenecks. **tl;dr - In short, major issues were the accumulation of duplicate objects mainly from SQLMetrics.**
Greater than 25% of the 40G driver heap filled with (a very large number of) **duplicate**, immutable objects.
1. Very large number of **duplicate** immutable objects.
- Type of metric is represented by `'scala.Some("sql")'` - which is created for each metric.
- Fixing this reduced memory usage from 4GB to a few bytes.
2. `scala.Some(0)` and `scala.Some(-1)` are very common metric values (typically to indicate absence of metric)
- Individually the values are all immutable, but spark sql was creating a new instance each time.
- Intern'ing these resulted in saving ~4.5GB for a 40G heap.
3. Using string interpolation for metric names.
- Interpolation results in creation of a new string object.
- We end up with a very large number of metric names - though the number of unique strings is miniscule.
- ~7.5 GB in the 40 GB heap : which went down to a few KB's when fixed.
### Why are the changes needed?
To reduce overall driver memory footprint which eventually reduces the Full GC pauses.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Since these are memory related optimizations, unit tests are not added. These changes are added in our internal platform which made it possible for one of the complex spark job continuously failing to succeed along with other set of optimizations.
Closes#32754 from venkata91/SPARK-35613.
Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
`addFile/addJar/addDirectory` should put CanonicalFile
### Why are the changes needed?
I met the error below.
21/06/07 00:06:57 ERROR SparkContext: Failed to add file:/home/runner/work/spark/spark/./core/target/scala-2.12/spark-
core_2.12-3.2.0-SNAPSHOT-tests.jar to Spark environment
java.lang.IllegalArgumentException: requirement failed: File spark-core_2.12-3.2.0-SNAPSHOT-tests.jar was already registered with a different path (old path = /home/runner/work/spark/spark/core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar, new path = /home/runner/work/spark/spark/./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar But actually, /home/runner/work/spark/spark/./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar* and * /*home/runner/work/spark/spark/core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar are the same*.
But actually, `/home/runner/work/spark/spark/./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar`and `/home/runner/work/spark/spark/core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar` are the same.
I think we should put the Canonical File in ConcurrentHashMap.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass the CIs.
Closes#32845 from pingsutw/SPARK-35691.
Authored-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
### What changes were proposed in this pull request?
Bug fix for deadlock during the executor shutdown
### Why are the changes needed?
When a executor received a TERM signal, it (the second TERM signal) will lock java.lang.Shutdown class and then call Shutdown.exit() method to exit the JVM.
Shutdown will call SparkShutdownHook to shutdown the executor.
During the executor shutdown phase, RemoteProcessDisconnected event will be send to the RPC inbox, and then WorkerWatcher will try to call System.exit(-1) again.
Because java.lang.Shutdown has already locked, a deadlock has occurred.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Test case "task reaper kills JVM if killed tasks keep running for too long" in JobCancellationSuite
Closes#32868 from wankunde/SPARK-35714.
Authored-by: Kun Wan <wankun@apache.org>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
Previously, the following two commits allow driver-owned on-demand PVC reuse.
- SPARK-35182 Support driver-owned on-demand PVC
- SPARK-35416 Support PersistentVolumeClaim Reuse
This PR aims to recover the shuffle data on those remounted PVCs. The lifecycle of PVCs are tied to the one of Spark jobs. Since this is K8s specific feature, `ShuffleDataIO` plugin is used.
### Why are the changes needed?
Although Pod is killed, we can remount PVCs and recover some data from it.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the newly added test cases.
Closes#32730 from dongjoon-hyun/SPARK-RECOVER-SHUFFLE-DATA.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle.
### Summary of changes:
Executor will create the merge directories under the application temp directory provided by YARN. The access control of the folder will be set to 770, where Shuffle Service can create merged shuffle files and write merge shuffle data in to those files.
Serve the merged shuffle blocks fetch request, read the merged shuffle blocks.
### Why are the changes needed?
Refer to the SPIP in SPARK-30602.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as documented in the SPIP doc.
Lead-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Ye Zhou yezhoulinkedin.com
Closes#32007 from zhouyejoe/SPARK-33350.
Lead-authored-by: Ye Zhou <yezhou@linkedin.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
Summary of the changes made as part of this PR:
1. `DAGScheduler` changes to finalize a ShuffleMapStage which involves talking to all the shuffle mergers (`ExternalShuffleService`) and getting all the completed merge statuses.
2. Once the `ShuffleMapStage` finalization is complete, mark the `ShuffleMapStage` to be finalized which marks the stage as complete and subsequently letting the child stage start.
3. Also added the relevant tests to `DAGSchedulerSuite` for changes made as part of [SPARK-32919](https://issues.apache.org/jira/browse/SPARK-32919)
Lead-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Venkata krishnan Sowrirajan vsowrirajanlinkedin.com
Co-authored-by: Chandni Singh chsinghlinkedin.com
### Why are the changes needed?
Refer to [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602)
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit tests to DAGSchedulerSuite
Closes#30691 from venkata91/SPARK-32920.
Lead-authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/32534#32534 proposed a use case to use `DeserializedMemoryEntry` to store off-heap data, and let Spark release the memory via the `AutoCloseable` interface. However, there is one more problem: `DeserializedMemoryEntry` always reports its size as on-heap size, which is inaccurate. If the Spark cluster is configured with small on-heap size and large off-heap size, this will trigger a lot of spilling.
This PR makes `DeserializedMemoryEntry` truly support off-heap data. Now the caller side can cache off-heap data with a new storage level `OFF_HEAP_ONLY_DESER`.
### Why are the changes needed?
correct the memory counting for off-heap data.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
updated test
Closes#32800 from cloud-fan/follow.
Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
The implementation for the save operation of RocksDBFileManager.
### Why are the changes needed?
Save all the files in the given local checkpoint directory as a committed version in DFS.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New UT added.
Closes#32582 from xuanyuanking/SPARK-35436.
Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
Currently spark.jars.xxx property keys (e.g. spark.jars.ivySettings and spark.jars.packages) are hardcoded in multiple places within Spark code across multiple modules. We should define them in config/package.scala and reference them in all other places.
### Why are the changes needed?
improvement
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
no
Closes#32746 from dgd-contributor/SPARK-35074_configs_should_be_moved_to_config_package.scala.
Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Thomas Graves <tgraves@apache.org>
### What changes were proposed in this pull request?
In `BlockManagerMasterEndpoint` for the disk persisted RDDs (when `spark.shuffle.service.fetch.rdd.enable` is enabled) we are keeping track the block status entries by external shuffle service instances (so on YARN we are basically keeping them by nodes). This is the `blockStatusByShuffleService` member val. And when all the RDD blocks are removed for one external shuffle service instance then the key and the empty map can be removed from `blockStatusByShuffleService`.
### Why are the changes needed?
It is a small leak and I was asked to take care of it in https://github.com/apache/spark/pull/32114#discussion_r640270377.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually by adding a temporary log line to check `blockStatusByShuffleService` value before and after the `removeRdd` and run the `SPARK-25888: using external shuffle service fetching disk persisted blocks` test in `ExternalShuffleServiceSuite`.
Closes#32790 from attilapiros/SPARK-35543.
Authored-by: attilapiros <piros.attila.zsolt@gmail.com>
Signed-off-by: attilapiros <piros.attila.zsolt@gmail.com>
### What changes were proposed in this pull request?
This PR aims to change `DiskBlockManager` like the following to allow `ShuffleDataIO` to decide the behavior of shuffle file deletion.
```scala
- private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean)
+ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Boolean)
```
### Why are the changes needed?
`SparkContext` creates
1. `SparkEnv` (with `BlockManager` and its `DiskBlockManager`)
2. loads `ShuffleDataIO`
3. initialize block manager.
```scala
_env = createSparkEnv(_conf, isLocal, listenerBus)
...
_shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
_shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) =>
_conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
}
...
_env.blockManager.initialize(_applicationId)
...
```
`DiskBlockManager` is created first at `BlockManager` constructor and we cannot change `deleteFilesOnStop` later at `ShuffleDataIO`. By switching to `var`, we can implement enhanced shuffle data management feature via `ShuffleDataIO` like https://github.com/apache/spark/pull/32730 .
```
val diskBlockManager = {
// Only perform cleanup if an external service is not serving our shuffle files.
val deleteFilesOnStop =
!externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
new DiskBlockManager(conf, deleteFilesOnStop)
}
```
### Does this PR introduce _any_ user-facing change?
No. This is a private class.
### How was this patch tested?
N/A
Closes#32784 from dongjoon-hyun/SPARK-35654.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/32534 , and proposes to free the memory entry immediately instead of doing it in the backround asynchronously. The reason is:
1. It's a bit weird to free the resource in an asynchronous way.
2. We free the off-heap memory entry in the same thread, and it's better to be consistent with it.
3. We can simplify the code quite a bit.
This PR also simplifies the tests to reuse the class definition.
### Why are the changes needed?
code simplification
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing tests
Closes#32743 from cloud-fan/follow.
Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This removes the accidental duplicated test coverage.
### Why are the changes needed?
To save the test resources.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
N/A because this is a removal of the duplicated test coverage.
Closes#32774 from dongjoon-hyun/SPARK-35589-3.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This patch proposes a fix to prevent triggering BlockManager reregistration while `StopExecutor` msg is in-flight.
Here on receiving `StopExecutor` msg, we do not remove the corresponding `BlockManagerInfo` from `blockManagerInfo` map, instead we mark it as dead by updating the corresponding `executorRemovalTs`. There's a separate cleanup thread running to periodically remove the stale `BlockManagerInfo` from `blockManangerInfo` map.
Now if a recently removed `BlockManager` tries to register, the driver simply ignores it since the `blockManagerInfo` map already contains an entry for it. The same applies to `BlockManagerHeartbeat`, if the BlockManager belongs to a recently removed executor, the `blockManagerInfo` map would contain an entry and we shall not ask the corresponding `BlockManager` to re-register.
### Why are the changes needed?
This changes are needed since BlockManager reregistration while executor is shutting down causes inconsistent bookkeeping of executors in Spark.
Consider the following scenario:
- `CoarseGrainedSchedulerBackend` issues async `StopExecutor` on executorEndpoint
- `CoarseGrainedSchedulerBackend` removes that executor from Driver's internal data structures and publishes `SparkListenerExecutorRemoved` on the `listenerBus`.
- Executor has still not processed `StopExecutor` from the Driver
- Driver receives heartbeat from the Executor, since it cannot find the `executorId` in its data structures, it responds with `HeartbeatResponse(reregisterBlockManager = true)`
- `BlockManager` on the Executor reregisters with the `BlockManagerMaster` and `SparkListenerBlockManagerAdded` is published on the `listenerBus`
- Executor starts processing the `StopExecutor` and exits
- `AppStatusListener` picks the `SparkListenerBlockManagerAdded` event and updates `AppStatusStore`
- `statusTracker.getExecutorInfos` refers `AppStatusStore` to get the list of executors which returns the dead executor as alive.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Modified the existing unittests.
- Ran a simple test application on minikube that asserts on number of executors are zero once the executor idle timeout is reached.
Closes#32114 from sumeetgajjar/SPARK-35011.
Authored-by: Sumeet Gajjar <sumeetgajjar93@gmail.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
### What changes were proposed in this pull request?
A test case of AdaptiveQueryExecSuite becomes flaky since there are too many debug logs in RootLogger:
https://github.com/Yikun/spark/runs/2715222392?check_suite_focus=truehttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139125/testReport/
To fix it, I suggest supporting multiple loggers in the testing method withLogAppender. So that the LogAppender gets clean target log outputs.
### Why are the changes needed?
Fix a flaky test case.
Also, reduce unnecessary memory cost in tests.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test
Closes#32725 from gengliangwang/fixFlakyLogAppender.
Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
This PR aims to make `BlockManagerMasterEndpoint.updateBlockInfo` not to ignore index-only shuffle files.
In addition, this PR fixes `IndexShuffleBlockResolver.getMigrationBlocks` to return data files first.
### Why are the changes needed?
When [SPARK-20629](a4ca355af8) introduced a worker decommission, index-only shuffle files are not considered properly.
- SPARK-33198 fixed `getMigrationBlocks` to handle index only shuffle files
- SPARK-35589 (this) aims to fix `updateBlockInfo` to handle index only shuffle files.
### Does this PR introduce _any_ user-facing change?
No. This is a bug fix.
### How was this patch tested?
Pass the CIs with the newly added test case.
Closes#32727 from dongjoon-hyun/SPARK-UPDATE-OUTPUT.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
For different UIs, e.g. History Server or Spark Live UI, maybe need different capabilities to handle HTTP requests. Usually, a History Server is for multi-users and needs more threads to increase concurrency, while Live UI is per application, which needn't that large pool size.
In this PR, we increase the max pool size of the History Server's jetty backend
### Why are the changes needed?
increase the client concurrency of HistoryServer
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
new tests
Closes#32539 from yaooqinn/SPARK-35402.
Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
### What changes were proposed in this pull request?
Fixed tooltip for "Storage" tab in UI
### Why are the changes needed?
Tooltip correction was needed
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Manually tested
Closes#32664 from lidiyag/storagewebui.
Authored-by: lidiyag <lidiya.nixon@huawei.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
### What changes were proposed in this pull request?
```
- Upload multi stages *** FAILED ***
{{ The code passed to eventually never returned normally. Attempted 20 times over 10.011176743 seconds. Last failure message: fallbackStorage.exists(0, file) was false. (FallbackStorageSuite.scala:243)}}
```
The error like above was raised in aarch64 randomly and also in github action test[1][2].
[1] https://github.com/apache/spark/actions/runs/489319612
[2]https://github.com/apache/spark/actions/runs/479317320
### Why are the changes needed?
timeout is too short, need to increase to let test case complete.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
build/mvn test -Dtest=none -DwildcardSuites=org.apache.spark.storage.FallbackStorageSuite -pl :spark-core_2.12
Closes#32719 from Yikun/SPARK-35584.
Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
After SPARK-29291 and SPARK-33352, there are still some compilation warnings about `procedure syntax is deprecated` as follows:
```
[WARNING] [Warn] /spark/core/src/main/scala/org/apache/spark/MapOutputTracker.scala:723: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `registerMergeResult`'s return type
[WARNING] [Warn] /spark/core/src/main/scala/org/apache/spark/MapOutputTracker.scala:748: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `unregisterMergeResult`'s return type
[WARNING] [Warn] /spark/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala:223: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `testSimpleSpillingForAllCodecs`'s return type
[WARNING] [Warn] /spark/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASBenchmark.scala:53: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `runBLASBenchmark`'s return type
[WARNING] [Warn] /spark/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala:110: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `assertEmptyRootPath`'s return type
[WARNING] [Warn] /spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala:602: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `executeCTASWithNonEmptyLocation`'s return type
```
So the main change of this pr is cleanup these compilation warnings.
### Why are the changes needed?
Eliminate compilation warnings in Scala 2.13 and this change should be compatible with Scala 2.12
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass the Jenkins or GitHub Action
Closes#32669 from LuciferYang/re-clean-procedure-syntax.
Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR aims to upgrade json4s from 3.7.0-M5 to 3.7.0-M11
Note: json4s version greater than 3.7.0-M11 is not binary compatible with Spark third party jars
### Why are the changes needed?
Multiple defect fixes and improvements like
https://github.com/json4s/json4s/issues/750https://github.com/json4s/json4s/issues/554https://github.com/json4s/json4s/issues/715
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Ran with the existing UTs
Closes#32636 from vinodkc/br_build_upgrade_json4s.
Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
This PR is proposing a add-on to support to manual close entries in MemoryStore and InMemoryRelation
### What changes were proposed in this pull request?
Currently:
MemoryStore uses a LinkedHashMap[BlockId, MemoryEntry[_]] to store all OnHeap or OffHeap entries.
And when memoryStore.remove(blockId) is called, codes will simply remove one entry from LinkedHashMap and leverage Java GC to do release work.
This PR:
We are proposing a add-on to manually close any object stored in MemoryStore and InMemoryRelation if this object is extended from AutoCloseable.
Veifiication:
In our own use case, we implemented a user-defined off-heap-hashRelation for BHJ, and we verified that by adding this manual close, we can make sure our defined off-heap-hashRelation can be released when evict is called.
Also, we implemented user-defined cachedBatch and will be release when InMemoryRelation.clearCache() is called by this PR
### Why are the changes needed?
This changes can help to clean some off-heap user-defined object may be cached in InMemoryRelation or MemoryStore
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
WIP
Signed-off-by: Chendi Xue <chendi.xueintel.com>
Closes#32534 from xuechendi/support_manual_close_in_memorystore.
Authored-by: Chendi Xue <chendi.xue@intel.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
When a memory reservation triggers a self-spill, `ExecutionMemoryPool#releaseMemory()` will immediately notify waiting tasks that memory has been freed. If there are any waiting tasks with less than 1/2N of the memory pool, they may acquire the newly-freed memory before the current task has a chance to do so. This will cause the original memory reservation to fail. If the initial spill did not release all available memory, the reservation could have been satisfied by asking it to spill again.
This PR adds logic to TaskMemoryManager to detect this case and retry.
### Why are the changes needed?
This bug affects queries with a MemoryConsumer that can spill part of its memory, such as BytesToBytesMap. If the MemoryConsumer is using all available memory and there is a waiting task, then attempting to acquire more memory on the MemoryConsumer will trigger a partial self-spill. However, because the waiting task gets priority, the attempt to acquire memory will fail even if it could have been satisfied by another spill.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a test to MemoryManagerSuite that previously failed and now passes.
Closes#32625 from ankurdave/SPARK-35486.
Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
### What changes were proposed in this pull request?
Print the invalid value in config validation error message for `checkValue` just like `checkValues`
### Why are the changes needed?
Invalid configuration values may come in many ways, this PR can help different kinds of users or developers to identify what the config the error is related to
### Does this PR introduce _any_ user-facing change?
yes, but only error msg
### How was this patch tested?
yes, modified tests
Closes#32600 from yaooqinn/SPARK-35456.
Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>