Commit graph

8050 commits

Author SHA1 Message Date
Jie ab80d3c167 [SPARK-35027][CORE] Close the inputStream in FileAppender when writin…
### What changes were proposed in this pull request?

1. add "closeStreams" to FileAppender and RollingFileAppender
2. set "closeStreams" to "true" in ExecutorRunner

### Why are the changes needed?

The executor will hang when due disk full or other exceptions which happened in writting to outputStream: the root cause is the "inputStream" is not closed after the error happens:
1. ExecutorRunner creates two files appenders for pipe: one for stdout, one for stderr
2. FileAppender.appendStreamToFile exits the loop when writing to outputStream
3. FileAppender closes the outputStream, but left the inputStream which refers the pipe's stdout and stderr opened
4. The executor will hang when printing the log message if the pipe is full (no one consume the outputs)
5. From the driver side, you can see the task can't be completed for ever

With this fix, the step 4 will throw an exception, the driver can catch up the exception and reschedule the failed task to other executors.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Add new tests for the "closeStreams" in FileAppenderSuite

Closes #33263 from jhu-chang/SPARK-35027.

Authored-by: Jie <gt.hu.chang@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
(cherry picked from commit 1a8c6755a1)
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-07-20 21:24:37 -05:00
Ye Zhou 1907f0ac57 [SPARK-35546][SHUFFLE] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a better way
### What changes were proposed in this pull request?
This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle.

### Summary of the change:
When Executor registers with Shuffle Service, it will encode the merged shuffle dir created and also the application attemptId into the ShuffleManagerMeta into Json. Then in Shuffle Service, it will decode the Json string and get the correct merged shuffle dir and also the attemptId. If the registration comes from a newer attempt, the merged shuffle information will be updated to store the information from the newer attempt.

This PR also refactored the management of the merged shuffle information to avoid concurrency issues.
### Why are the changes needed?
Refer to the SPIP in SPARK-30602.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as documented in the SPIP doc.

Closes #33078 from zhouyejoe/SPARK-35546.

Authored-by: Ye Zhou <yezhou@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit c77acf0bbc)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
2021-07-20 00:04:16 -05:00
Dongjoon Hyun c3a23ce49b [SPARK-36193][CORE] Recover SparkSubmit.runMain not to stop SparkContext in non-K8s env
### What changes were proposed in this pull request?

According to the discussion on https://github.com/apache/spark/pull/32283 , this PR aims to limit the feature of SPARK-34674 to K8s environment only.

### Why are the changes needed?

To reduce the behavior change in non-K8s environment.

### Does this PR introduce _any_ user-facing change?

The change behavior is consistent with 3.1.1 and older Spark releases.

### How was this patch tested?

N/A

Closes #33403 from dongjoon-hyun/SPARK-36193.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit fd3e9ce0b9)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-07-18 22:26:31 -07:00
Chandni Singh 595e8251d1 [SPARK-32922][SHUFFLE][CORE][FOLLOWUP] Fixes few issues when the executor tries to fetch push-merged blocks
### What changes were proposed in this pull request?
Below 2 bugs were introduced with https://github.com/apache/spark/pull/32140
1. Instead of requesting the local-dirs for push-merged-local blocks from the ESS, `PushBasedFetchHelper` requests it from other executors. Push-based shuffle is only enabled when the ESS is enabled so it should always fetch the dirs from the ESS and not from other executors which is not yet supported.
2. The size of the push-merged blocks is logged incorrectly.

### Why are the changes needed?
This fixes the above mentioned bugs and is needed for push-based shuffle to work properly.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tested this by running an application on the cluster. The UTs mock the call `hostLocalDirManager.getHostLocalDirs` which is why didn't catch (1) with the UT. However, the fix is trivial and checking this in the UT will require a lot more effort so I haven't modified it in the UT.
Logs of the executor with the bug
```
21/07/15 15:42:46 WARN ExternalBlockStoreClient: Error while trying to get the host local dirs for [shuffle-push-merger]
21/07/15 15:42:46 WARN PushBasedFetchHelper: Error while fetching the merged dirs for push-merged-local blocks: shuffle_0_-1_13. Fetch the original blocks instead
java.lang.RuntimeException: java.lang.IllegalStateException: Invalid executor id: shuffle-push-merger, expected 92.
	at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:130)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
```
After the fix, the executors were able to fetch the local push-merged blocks.

Closes #33378 from otterc/SPARK-32922-followup.

Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 6d2cbadcfe)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
2021-07-17 00:27:30 -05:00
yi.wu d5022c3c6f [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file
### What changes were proposed in this pull request?

This is the initial work of add checksum support of shuffle. This is a piece of https://github.com/apache/spark/pull/32385. And this PR only adds checksum functionality at the shuffle writer side.

Basically, the idea is to wrap a `MutableCheckedOutputStream`* upon the `FileOutputStream` while the shuffle writer generating the shuffle data. But the specific wrapping places are a bit different among the shuffle writers due to their different implementation:

* `BypassMergeSortShuffleWriter` -  wrap on each partition file
* `UnsafeShuffleWriter` - wrap on each spill files directly since they doesn't require aggregation, sorting
* `SortShuffleWriter` - wrap on the `ShufflePartitionPairsWriter` after merged spill files since they might require aggregation, sorting

\* `MutableCheckedOutputStream` is a variant of `java.util.zip.CheckedOutputStream` which can change the checksum calculator at runtime.

And we use the `Adler32`, which uses the CRC-32 algorithm but much faster, to calculate the checksum as the same as `Broadcast`'s checksum.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

Yes, added a new conf: `spark.shuffle.checksum`.

### How was this patch tested?

Added unit tests.

Closes #32401 from Ngone51/add-checksum-files.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 4783fb72af)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
2021-07-17 00:24:03 -05:00
Karen Feng 8b35bc4d2b [SPARK-36106][SQL][CORE] Label error classes for subset of QueryCompilationErrors
### What changes were proposed in this pull request?

Adds error classes to some of the exceptions in QueryCompilationErrors.

### Why are the changes needed?

Improves auditing for developers and adds useful fields for users (error class and SQLSTATE).

### Does this PR introduce _any_ user-facing change?

Yes, fills in missing error class and SQLSTATE fields.

### How was this patch tested?

Existing tests and new unit tests.

Closes #33309 from karenfeng/group-compilation-errors-1.

Authored-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit e92b8ea6f8)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-15 11:43:32 +09:00
Venkata krishnan Sowrirajan 5fa7855d10 [SPARK-32920][CORE][SHUFFLE][FOLLOW-UP] Fix to run push-based shuffle tests in DAGSchedulerSuite in ad-hoc manner
### What changes were proposed in this pull request?
Currently when the push-based shuffle tests are run in an ad-hoc manner through IDE, `spark.testing` is not set to true therefore `Utils#isPushBasedShuffleEnabled` returns false disabling push-based shuffle eventually causing the tests to fail. This doesn't happen when it is run on command line using maven as `spark.testing` is set to true.
Changes made - set `spark.testing` to true in `initPushBasedShuffleConfs`

### Why are the changes needed?
Fix to run DAGSchedulerSuite tests in ad-hoc manner

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
In my local IDE

Closes #33303 from venkata91/SPARK-32920-follow-up.

Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit fbf53dee37)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
2021-07-13 12:17:13 -05:00
Wenchen Fan 017b7d3f0b [SPARK-36074][SQL] Add error class for StructType.findNestedField
### What changes were proposed in this pull request?

This PR adds an INVALID_FIELD_NAME error class for the errors in `StructType.findNestedField`. It also cleans up the code there and adds UT for this method.

### Why are the changes needed?

follow the new error message framework

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #33282 from cloud-fan/error.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-13 21:15:00 +08:00
Takuya UESHIN 55111cafd1 [SPARK-36062][PYTHON] Try to capture faulthanlder when a Python worker crashes
### What changes were proposed in this pull request?

Try to capture the error message from the `faulthandler` when the Python worker crashes.

### Why are the changes needed?

Currently, we just see an error message saying `"exited unexpectedly (crashed)"` when the UDFs causes the Python worker to crash by like segmentation fault.
We should take advantage of [`faulthandler`](https://docs.python.org/3/library/faulthandler.html) and try to capture the error message from the `faulthandler`.

### Does this PR introduce _any_ user-facing change?

Yes, when a Spark config `spark.python.worker.faulthandler.enabled` is `true`, the stack trace will be seen in the error message when the Python worker crashes.

```py
>>> def f():
...   import ctypes
...   ctypes.string_at(0)
...
>>> sc.parallelize([1]).map(lambda x: f()).count()
```

```
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed): Fatal Python error: Segmentation fault

Current thread 0x000000010965b5c0 (most recent call first):
  File "/.../ctypes/__init__.py", line 525 in string_at
  File "<stdin>", line 3 in f
  File "<stdin>", line 1 in <lambda>
...
```

### How was this patch tested?

Added some tests, and manually.

Closes #33273 from ueshin/issues/SPARK-36062/faulthandler.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 115b8a180f)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-09 11:31:00 +09:00
Karen Feng f31cf163d9 [SPARK-35958][CORE] Refactor SparkError.scala to SparkThrowable.java
### What changes were proposed in this pull request?

Refactors the base Throwable trait `SparkError.scala` (introduced in SPARK-34920) an interface `SparkThrowable.java`.

### Why are the changes needed?

- Renaming `SparkError` to `SparkThrowable` better reflect sthat this is the base interface for both `Exception` and `Error`
- Migrating to Java maximizes its extensibility

### Does this PR introduce _any_ user-facing change?

Yes; the base trait has been renamed and the accessor methods have changed (eg. `sqlState` -> `getSqlState()`).

### How was this patch tested?

Unit tests.

Closes #33164 from karenfeng/SPARK-35958.

Authored-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 71c086eb87)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-08 23:55:11 +08:00
Kevin Su dc85b0b51a [SPARK-35950][WEBUI] Failed to toggle Exec Loss Reason in the executors page
### What changes were proposed in this pull request?

Update the executor's page, so it can successfully hide the "Exec Loss Reason" column.

### Why are the changes needed?

When unselected the checkbox "Exec Loss Reason" on the executor page,
the "Active tasks" column disappears instead of the "Exec Loss Reason" column.

Before:
![Screenshot from 2021-06-30 15-55-05](https://user-images.githubusercontent.com/37936015/123930908-bd6f4180-d9c2-11eb-9aba-bbfe0a237776.png)
After:
![Screenshot from 2021-06-30 22-21-38](https://user-images.githubusercontent.com/37936015/123977632-bf042e00-d9f1-11eb-910e-93d615d2db47.png)

### Does this PR introduce _any_ user-facing change?

Yes, The Web UI is updated.

### How was this patch tested?

Pass the CIs.

Closes #33155 from pingsutw/SPARK-35950.

Lead-authored-by: Kevin Su <pingsutw@gmail.com>
Co-authored-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-07-01 12:32:54 +08:00
yi.wu 868a594706 [SPARK-35714][FOLLOW-UP][CORE] Use a shared stopping flag for WorkerWatcher to avoid the duplicate System.exit
### What changes were proposed in this pull request?

This PR proposes to let `WorkerWatcher` reuse the `stopping` flag in `CoarseGrainedExecutorBackend` to avoid the duplicate call of `System.exit`.

### Why are the changes needed?

As a followup of https://github.com/apache/spark/pull/32868, this PR tries to give a more robust fix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass existing tests.

Closes #33028 from Ngone51/spark-35714-followup.

Lead-authored-by: yi.wu <yi.wu@databricks.com>
Co-authored-by: wuyi <yi.wu@databricks.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
2021-07-01 11:40:00 +08:00
Karen Feng e3bd817d65 [SPARK-34920][CORE][SQL] Add error classes with SQLSTATE
### What changes were proposed in this pull request?

Unifies exceptions thrown from Spark under a single base trait `SparkError`, which unifies:
- Error classes
- Parametrized error messages
- SQLSTATE, as discussed in http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-Add-error-IDs-td31126.html.

### Why are the changes needed?

- Adding error classes creates a consistent label for exceptions, even as error messages change
- Creating a single, centralized source-of-truth for parametrized error messages improves auditing for error message quality
- Adding SQLSTATE helps ODBC/JDBC users receive standardized error codes

### Does this PR introduce _any_ user-facing change?

Yes, changes ODBC experience by:
- Adding error classes to error messages
- Adding SQLSTATE to TStatus

### How was this patch tested?

Unit tests, as well as local tests with PyODBC.

Closes #32850 from karenfeng/SPARK-34920.

Authored-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-30 09:22:02 +00:00
Cheng Su 6bbfb45ffe [SPARK-33298][CORE][FOLLOWUP] Add Unstable annotation to FileCommitProtocol
### What changes were proposed in this pull request?

This is the followup from https://github.com/apache/spark/pull/33012#discussion_r659440833, where we want to add `Unstable` to `FileCommitProtocol`, to give people a better idea of API.

### Why are the changes needed?

Make it easier for people to follow and understand code. Clean up code.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests, as no real logic change.

Closes #33148 from c21/bucket-followup.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-30 16:25:20 +09:00
Chandni Singh 9a5cd15e87 [SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data
### What changes were proposed in this pull request?
This is the shuffle fetch side change where executors can fetch local/remote push-merged shuffle data from shuffle services. This is needed for push-based shuffle - SPIP [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
The change adds support to the `ShuffleBlockFetchIterator` to fetch push-merged block meta and shuffle chunks from local and remote ESS. If the fetch of any of these fails, then the iterator fallsback to fetch the original shuffle blocks that belonged to the push-merged block.

### Why are the changes needed?
These changes are needed for push-based shuffle. Refer to the SPIP in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).

### Does this PR introduce _any_ user-facing change?
When push-based shuffle is turned on then that will fetch push-merged blocks from the remote shuffle service. The client logs will indicate this.

### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
We have already verified the functionality and the improved performance as documented in the SPIP doc.

Lead-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Ye Zhou yezhoulinkedin.com

Closes #32140 from otterc/SPARK-32922.

Lead-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: otterc <singh.chandni@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-06-29 17:44:15 -05:00
Dongjoon Hyun 7e7028282c [SPARK-35928][BUILD] Upgrade ASM to 9.1
### What changes were proposed in this pull request?

This PR aims to upgrade ASM to 9.1

### Why are the changes needed?

The latest `xbean-asm9-shaded` is built with ASM 9.1.

- https://mvnrepository.com/artifact/org.apache.xbean/xbean-asm9-shaded/4.20
- 5e0e3c0c64/pom.xml (L67)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes #33130 from dongjoon-hyun/SPARK-35928.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-29 10:27:51 -07:00
Kent Yao 9c157a490b [SPARK-35910][CORE][SHUFFLE] Update remoteBlockBytes based on merged block info to reduce task time
### What changes were proposed in this pull request?

Currently, we calculate the `remoteBlockBytes` based on the original block info list. It's not efficient. Usually, it costs more ~25% time to be spent here.

If the original reducer size is big but the actual reducer size is small due to automatically partition coalescing of AQE, the reducer will take more time to calculate `remoteBlockBytes`.

We can reduce this cost via remote requests which contain merged block info lists.

### Why are the changes needed?

improve task performance

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new unit tests and verified manually.

Closes #33109 from yaooqinn/SPARK-35910.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-28 13:55:59 -07:00
Erik Krogen 3255511d52 [SPARK-35258][SHUFFLE][YARN] Add new metrics to ExternalShuffleService for better monitoring
### What changes were proposed in this pull request?
This adds two new additional metrics to `ExternalBlockHandler`:
- `blockTransferRate` -- for indicating the rate of transferring blocks, vs. the data within them
- `blockTransferAvgSize_1min` -- a 1-minute trailing average of block sizes transferred by the ESS

Additionally, this enhances `YarnShuffleServiceMetrics` to expose the histogram/`Snapshot` information from `Timer` metrics within `ExternalBlockHandler`.

### Why are the changes needed?
Currently `ExternalBlockHandler` exposes some useful metrics, but is lacking around metrics for the rate of block transfers. We have `blockTransferRateBytes` to tell us the rate of _bytes_, but no metric to tell us the rate of _blocks_, which is especially relevant when running the ESS on HDDs that are sensitive to random reads. Many small block transfers can have a negative impact on performance, but won't show up as a spike in `blockTransferRateBytes` since the sizes are small. Thus the new metrics to show information around average block size and block transfer rate are very useful to monitor the health/performance of the ESS, especially when running on HDDs.

For the `YarnShuffleServiceMetrics`, currently the three `Timer` metrics exposed by `ExternalBlockHandler` are being underutilized in a YARN-based environment -- they are basically treated as a `Meter`, only exposing rate-based information, when the metrics themselves are collected detailed histograms of timing information. We should expose this information for better observability.

### Does this PR introduce _any_ user-facing change?
Yes, there are two entirely new metrics for the ESS, as documented in `monitoring.md`. Additionally in a YARN environment, `Timer` metrics exposed by the ESS will include more rich timing information.

### How was this patch tested?
New unit tests are added to verify that new metrics are showing up as expected.

We have been running this patch internally for approx. 1 year and have found it to be useful for monitoring the health of ESS and diagnosing performance issues.

Closes #32388 from xkrogen/xkrogen-SPARK-35258-ess-new-metrics.

Authored-by: Erik Krogen <xkrogen@apache.org>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-06-28 02:36:17 -05:00
Kent Yao 14d4decf73 [SPARK-35879][CORE][SHUFFLE] Fix performance regression caused by collectFetchRequests
### What changes were proposed in this pull request?

This PR fixes perf regression at the executor side when creating fetch requests with large initial partitions

![image](https://user-images.githubusercontent.com/8326978/123270865-dd21e800-d532-11eb-8447-ad80e47b034f.png)

In NetEase, we had an online job that took `45min` to "fetch" about 100MB of shuffle data, which actually turned out that it was just collecting fetch requests slowly. Normally, such a task should finish in seconds.

See the `DEBUG` log

```
21/06/22 11:52:26 DEBUG BlockManagerStorageEndpoint: Sent response: 0 to kyuubi.163.org:
21/06/22 11:53:05 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 3941440 at BlockManagerId(12, .., 43559, None) with 19 blocks
21/06/22 11:53:44 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 3716400 at BlockManagerId(20, .., 38287, None) with 18 blocks
21/06/22 11:54:41 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 4559280 at BlockManagerId(6, .., 39689, None) with 22 blocks
21/06/22 11:55:08 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 3120160 at BlockManagerId(33, .., 39449, None) with 15 blocks
```

I also create a test case locally with my local laptop docker env to give some reproducible cases.

```
bin/spark-sql --conf spark.kubernetes.file.upload.path=./ --master k8s://https://kubernetes.docker.internal:6443 --conf spark.kubernetes.container.image=yaooqinn/spark:v20210624-5 -c spark.kubernetes.context=docker-for-desktop_1 --num-executors 5 --driver-memory 5g --conf spark.kubernetes.executor.podNamePrefix=sparksql
```

```sql
 SET spark.sql.adaptive.enabled=true;
 SET spark.sql.shuffle.partitions=3000;
 SELECT /*+ REPARTITION */ 1 as pid, id from range(1, 1000000, 1, 500);
 SELECT /*+ REPARTITION(pid, id) */ 1 as pid, id from range(1, 1000000, 1, 500);
 ```

### Why are the changes needed?

fix perf regression which was introduced by SPARK-29292 (3ad4863673) in v3.1.0.

3ad4863673 is for support compilation with scala 2.13 but the performance losses is huge. We need to consider backporting this PR to branch 3.1.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Mannully,

#### before
```log
 21/06/23 13:54:22 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRemoteRequestSize: 10066329, maxBlocksInFlightPerAddress: 2147483647
 21/06/23 13:54:38 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2314708 at BlockManagerId(2, 10.1.3.114, 36423, None) with 86 blocks
 21/06/23 13:54:59 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2636612 at BlockManagerId(3, 10.1.3.115, 34293, None) with 87 blocks
 21/06/23 13:55:18 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2508706 at BlockManagerId(4, 10.1.3.116, 41869, None) with 90 blocks
 21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2350854 at BlockManagerId(5, 10.1.3.117, 45787, None) with 85 blocks
 21/06/23 13:55:34 INFO ShuffleBlockFetcherIterator: Getting 438 (11.8 MiB) non-empty blocks including 90 (2.5 MiB) local and 0 (0.0 B) host-local and 348 (9.4 MiB) remote blocks
 21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Sending request for 87 blocks (2.5 MiB) from 10.1.3.115:34293
 21/06/23 13:55:34 INFO TransportClientFactory: Successfully created connection to /10.1.3.115:34293 after 1 ms (0 ms spent in bootstraps)
 21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Sending request for 90 blocks (2.4 MiB) from 10.1.3.116:41869
 21/06/23 13:55:34 INFO TransportClientFactory: Successfully created connection to /10.1.3.116:41869 after 2 ms (0 ms spent in bootstraps)
 21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Sending request for 85 blocks (2.2 MiB) from 10.1.3.117:45787
 ```
```log
 21/06/23 14:00:45 INFO MapOutputTracker: Broadcast outputstatuses size = 411, actual size = 828997
 21/06/23 14:00:45 INFO MapOutputTrackerWorker: Got the map output locations
 21/06/23 14:00:45 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRemoteRequestSize: 10066329, maxBlocksInFlightPerAddress: 2147483647
 21/06/23 14:00:55 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1894389 at BlockManagerId(2, 10.1.3.114, 36423, None) with 99 blocks
 21/06/23 14:01:04 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1919993 at BlockManagerId(3, 10.1.3.115, 34293, None) with 100 blocks
 21/06/23 14:01:14 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1977186 at BlockManagerId(5, 10.1.3.117, 45787, None) with 103 blocks
 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1938336 at BlockManagerId(4, 10.1.3.116, 41869, None) with 101 blocks
 21/06/23 14:01:23 INFO ShuffleBlockFetcherIterator: Getting 500 (9.1 MiB) non-empty blocks including 97 (1820.3 KiB) local and 0 (0.0 B) host-local and 403 (7.4 MiB) remote blocks
 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 101 blocks (1892.9 KiB) from 10.1.3.116:41869
 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 103 blocks (1930.8 KiB) from 10.1.3.117:45787
 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 99 blocks (1850.0 KiB) from 10.1.3.114:36423
 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 100 blocks (1875.0 KiB) from 10.1.3.115:34293
 21/06/23 14:01:23 INFO ShuffleBlockFetcherIterator: Started 4 remote fetches in 37889 ms
 ```

#### After

```log
21/06/24 13:01:16 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRemoteRequestSize: 10066329, maxBlocksInFlightPerAddress: 2147483647
21/06/24 13:01:16 INFO ShuffleBlockFetcherIterator: ==> Call blockInfos.map(_._2).sum: 40 ms
21/06/24 13:01:16 INFO ShuffleBlockFetcherIterator: ==> Call mergeFetchBlockInfo for shuffle_0_9_2990_2997/9: 0 ms
21/06/24 13:01:16 INFO ShuffleBlockFetcherIterator: ==> Call mergeFetchBlockInfo for shuffle_0_15_2395_2997/15: 0 ms
```

Closes #33063 from yaooqinn/SPARK-35879.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
2021-06-26 12:48:24 +08:00
Yuanjian Li 0c31137172 [SPARK-35628][SS][FOLLOW-UP] Fix the consistent break on Scala 2.13 build
### What changes were proposed in this pull request?
Fix the consistent break on Scala 2.13 build caused by the PR https://github.com/apache/spark/pull/32767

### Why are the changes needed?
Fix the consistent break.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes #33084 from xuanyuanking/SPARK-35628-follow.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-25 07:08:03 -07:00
Erik Krogen 866df69c62 [SPARK-35672][CORE][YARN] Pass user classpath entries to executors using config instead of command line
### What changes were proposed in this pull request?
Refactor the logic for constructing the user classpath from `yarn.ApplicationMaster` into `yarn.Client` so that it can be leveraged on the executor side as well, instead of having the driver construct it and pass it to the executor via command-line arguments. A new method, `getUserClassPath`, is added to `CoarseGrainedExecutorBackend` which defaults to `Nil` (consistent with the existing behavior where non-YARN resource managers do not configure the user classpath). `YarnCoarseGrainedExecutorBackend` overrides this to construct the user classpath from the existing `APP_JAR` and `SECONDARY_JARS` configs.

### Why are the changes needed?
User-provided JARs are made available to executors using a custom classloader, so they do not appear on the standard Java classpath. Instead, they are passed as a list to the executor which then creates a classloader out of the URLs. Currently in the case of YARN, this list of JARs is crafted by the Driver (in `ExecutorRunnable`), which then passes the information to the executors (`CoarseGrainedExecutorBackend`) by specifying each JAR on the executor command line as `--user-class-path /path/to/myjar.jar`. This can cause extremely long argument lists when there are many JARs, which can cause the OS argument length to be exceeded, typically manifesting as the error message:

> /bin/bash: Argument list too long

A [Google search](https://www.google.com/search?q=spark%20%22%2Fbin%2Fbash%3A%20argument%20list%20too%20long%22&oq=spark%20%22%2Fbin%2Fbash%3A%20argument%20list%20too%20long%22) indicates that this is not a theoretical problem and afflicts real users, including ours. Passing this list using the configurations instead resolves this issue.

### Does this PR introduce _any_ user-facing change?
No, except for fixing the bug, allowing for larger JAR lists to be passed successfully. Configuration of JARs is identical to before.

### How was this patch tested?
New unit tests were added in `YarnClusterSuite`. Also, we have been running a similar fix internally for 4 months with great success.

Closes #32810 from xkrogen/xkrogen-SPARK-35672-classpath-scalable.

Authored-by: Erik Krogen <xkrogen@apache.org>
Signed-off-by: Thomas Graves <tgraves@apache.org>
2021-06-25 08:53:57 -05:00
Steve Loughran 36aaaa14c3 [SPARK-35878][CORE] Add fs.s3a.endpoint if unset and fs.s3a.endpoint.region is null
### What changes were proposed in this pull request?

This patches the hadoop configuration so that fs.s3a.endpoint is set to
s3.amazonaws.com if neither it nor fs.s3a.endpoint.region is set.

This stops S3A Filesystem creation failing with the error
"Unable to find a region via the region provider chain."
in some non-EC2 deployments.

See: HADOOP-17771.

when spark options are propagated to the hadoop configuration
in SparkHadoopUtils. the fs.s3a.endpoint value is set to
"s3.amazonaws.com" if unset and no explicit region
is set in fs.s3a.endpoint.region.

### Why are the changes needed?

A regression in Hadoop 3.3.1 has surfaced which causes S3A filesystem
instantiation to fail outside EC2 deployments if the host lacks
a CLI configuration in ~/.aws/config declaring the region, or
the `AWS_REGION` environment variable

HADOOP-17771 fixes this in Hadoop-3.3.2+, but
this spark patch will correct the behavior when running
Spark with the 3.3.1 artifacts.

It is harmless for older versions and compatible
with hadoop releases containing the HADOOP-17771
fix.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

New tests to verify propagation logic from spark conf to hadoop conf.

Closes #33064 from steveloughran/SPARK-35878-regions.

Authored-by: Steve Loughran <stevel@cloudera.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-25 05:24:55 -07:00
Yuanjian Li f2029e7442 [SPARK-35628][SS] RocksDBFileManager - load checkpoint from DFS
### What changes were proposed in this pull request?
The implementation for the load operation of RocksDBFileManager.

### Why are the changes needed?
Provide the functionality of loading all necessary files for specific checkpoint versions from DFS to the given local directory.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
New UT added.

Closes #32767 from xuanyuanking/SPARK-35628.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-06-25 18:38:26 +09:00
Cheng Su 2da42ca3b4 [SPARK-33298][CORE] Introduce new API to FileCommitProtocol allow flexible file naming
### What changes were proposed in this pull request?

This PR is to introduce a new sets of APIs `newTaskTempFile` and `newTaskTempFileAbsPath` inside `FileCommitProtocol`, to allow more flexible file naming of Spark output. The major change is to pass `FileNameSpec` into `FileCommitProtocol`, instead of original `ext` (currently having `prefix` and `ext`), to allow individual `FileCommitProtocol` implementation comes up with more flexible file names (e.g. has a custom `prefix`) for Hive/Presto bucketing - https://github.com/apache/spark/pull/30003. Provide a default implementations of the added APIs, so all existing implementation of `FileCommitProtocol` is NOT being broken.

### Why are the changes needed?

To make commit protocol more flexible in terms of Spark output file name.
Pre-requisite of https://github.com/apache/spark/pull/30003.

### Does this PR introduce _any_ user-facing change?

Yes for developers  who implement/run custom implementation of `FileCommitProtocol`. They can choose to implement for the newly added API.

### How was this patch tested?

Existing unit tests as this is just adding an API.

Closes #33012 from c21/commit-protocol-api.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-24 17:10:54 -07:00
Vinod KC 4dabba8f76 [SPARK-35747][CORE] Avoid printing full Exception stack trace, if Hbase/Kafka/Hive services are not running in a secure cluster
### What changes were proposed in this pull request?
In a secure Yarn cluster, even though HBase or Kafka, or Hive services are not used in the user application, yarn client unnecessarily trying to generate  Delegations token from these services. This will add additional delays while submitting spark application in a yarn cluster

 Also during HBase delegation token generation step in the application submit stage,  HBaseDelegationTokenProvider prints a full Exception Stack trace and it causes a noisy warning.
 Apart from printing exception stack trace, Application submission taking more time as it retries connection to HBase master multiple times before it gives up. So, if HBase is not used in the user Applications, it is better to suggest User disable HBase Delegation Token generation.

 This PR aims to avoid printing full Exception Stack by just printing just Exception name and also add a suggestion message to disable `Delegation Token generation` if service is not used in the Spark Application.

 eg: `If HBase is not used, set spark.security.credentials.hbase.enabled to false`

### Why are the changes needed?

To avoid printing full Exception stack trace in WARN log
#### Before the fix
----------------
```
spark-shell --master yarn
.......
.......
21/06/12 14:29:41 WARN security.HBaseDelegationTokenProvider: Failed to get token from service hbase
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.security.HBaseDelegationTokenProvider.obtainDelegationTokensWithHBaseConn(HBaseDelegationT
okenProvider.scala:93)
        at org.apache.spark.deploy.security.HBaseDelegationTokenProvider.obtainDelegationTokens(HBaseDelegationTokenProvider.
scala:60)
        at org.apache.spark.deploy.security.HadoopDelegationTokenManager$$anonfun$6.apply(HadoopDelegationTokenManager.scala:
166)
        at org.apache.spark.deploy.security.HadoopDelegationTokenManager$$anonfun$6.apply(HadoopDelegationTokenManager.scala:
164)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.deploy.security.HadoopDelegationTokenManager.obtainDelegationTokens(HadoopDelegationTokenManager.
scala:164)
```

#### After  the fix
------------
```
 spark-shell --master yarn

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/06/13 02:10:02 WARN security.HBaseDelegationTokenProvider: Failed to get token from service hbase due to  java.lang.reflect.InvocationTargetException Retrying to fetch HBase security token with hbase connection parameter.
21/06/13 02:10:40 WARN security.HBaseDelegationTokenProvider: Failed to get token from service hbase java.lang.reflect.InvocationTargetException. If HBase is not used, set spark.security.credentials.hbase.enabled to false
21/06/13 02:10:47 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
```
### Does this PR introduce _any_ user-facing change?

Yes, in the log, it avoids printing full Exception stack trace.
Instread prints this.
**WARN security.HBaseDelegationTokenProvider: Failed to get token from service hbase java.lang.reflect.InvocationTargetException. If HBase is not used, set spark.security.credentials.hbase.enabled to false**

### How was this patch tested?

Tested manually as it can be verified only in a secure cluster

Closes #32894 from vinodkc/br_fix_Hbase_DT_Exception_stack_printing.

Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-23 23:12:02 -07:00
Dongjoon Hyun af9b47f8f8 [SPARK-35868][CORE] Add fs.s3a.downgrade.syncable.exceptions if not set
### What changes were proposed in this pull request?

This PR aims to add `fs.s3a.downgrade.syncable.exceptions=true` if it's not provided by the users.

### Why are the changes needed?

Currently, event log feature is broken with Hadoop 3.2 profile due to `UnsupportedOperationException` because [HADOOP-17597](https://issues.apache.org/jira/browse/HADOOP-17597) changes the default behavior to throw exceptions by default since Apache Hadoop 3.3.1. We know that it's because `EventLogFileWriters` is using `hadoopDataStream.foreach(_.hflush())`, but this PR aims to provide the same UX across Spark distributions with Hadoop2/Hadoop 3 at Apache Spark 3.2.0.
```
$ bin/spark-shell -c spark.eventLog.enabled=true -c spark.eventLog.dir=s3a://dongjoon/spark-events/
...
21/06/23 17:34:35 ERROR SparkContext: Error initializing SparkContext.
java.lang.UnsupportedOperationException: S3A streams are not Syncable. See HADOOP-17597.
```

### Does this PR introduce _any_ user-facing change?

Yes, this will recover the existing behavior.

### How was this patch tested?

Manual.
```
$ build/sbt package -Phadoop-3.2 -Phadoop-cloud
$ bin/spark-shell -c spark.eventLog.enabled=true -c spark.eventLog.dir=s3a://dongjoon/spark-events/
...(working)...
```

If the users provide the configuration explicitly, it will return to the original behavior throwing exceptions.
```
$ bin/spark-shell -c spark.eventLog.enabled=true -c spark.eventLog.dir=s3a://dongjoon/spark-events/ -c spark.hadoop.fs.s3a.downgrade.syncable.exceptions=false
...
21/06/23 17:44:41 ERROR Main: Failed to initialize Spark session.
java.lang.UnsupportedOperationException: S3A streams are not Syncable. See HADOOP-17597.
```

Closes #33044 from dongjoon-hyun/SPARK-35868.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-23 22:46:36 -07:00
attilapiros 0bdece015e [SPARK-35543][CORE][FOLLOWUP] Fix memory leak in BlockManagerMasterEndpoint removeRdd
### What changes were proposed in this pull request?

Wrapping `JHashMap[BlockId, BlockStatus]` (used in `blockStatusByShuffleService`) into a new class `BlockStatusPerBlockId` which removes the reference to the map when all the persisted blocks are removed.

### Why are the changes needed?

With https://github.com/apache/spark/pull/32790 a bug is introduced when all the persisted blocks are removed we remove the HashMap which already shared by the block manger infos but when new block is persisted this map is needed to be used again for storing the data (and this HashMap must be the same which shared by the block manger infos created for registered block managers running on the same host where the external shuffle service is).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Extending `BlockManagerInfoSuite` with test which removes all the persisted blocks then adds another one.

Closes #33020 from attilapiros/SPARK-35543-2.

Authored-by: attilapiros <piros.attila.zsolt@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-06-24 00:01:40 -05:00
yi.wu 7f937730ff [SPARK-33741][FOLLOW-UP][CORE] Rename the min threshold time speculation config
### What changes were proposed in this pull request?

This's a follow-up of https://github.com/apache/spark/pull/30710.
Rename the conf from `spark.speculation.min.threshold` to `spark.speculation.minTaskRuntime`.

### Why are the changes needed?

To follow the [config naming policy](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala#L21).

### Does this PR introduce _any_ user-facing change?

No (since Spark 3.2 hasn't been released).

### How was this patch tested?

Pass existing tests.

Closes #33037 from Ngone51/spark-33741-followup.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-23 13:29:58 +00:00
Chandni Singh 1fe6daa002 [SPARK-35836][SHUFFLE][CORE] Removed the reference to spark.shuffle.push.based.enabled in ShuffleBlockPusherSuite
### What changes were proposed in this pull request?
It is a trivial change to remove the reference to an incorrect configuration for push-based shuffle from a test suite.
Ref: https://github.com/apache/spark/pull/30312
With SPARK-32917, `ShuffleBlockPusher` and its test suite was introduced. `ShuffleBlockPusher` is created only when push-based shuffle is enabled and the tests in `ShuffleBlockPusherSuite` are just testing the functionality in the pusher. So there is no need to have these configs enabled in these test.

### Why are the changes needed?
This change removes an incorrect configuration from the test suite.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
This change just removes an incorrect configuration from the test suite so haven't added any UTs for it.

Closes #32992 from otterc/SPARK-35836.

Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-06-21 13:57:55 -05:00
Vasily Kolpakov 844f10c742 [SPARK-35391] Fix memory leak in ExecutorAllocationListener
### What changes were proposed in this pull request?
This PR fixes a memory leak in ExecutorAllocationListener.

### Why are the changes needed?
Dynamic allocation stops working under high load (~100 tasks/s, ~5 stages/s) in long-lived (~10 days) spark applications. This PR addresses the problem.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manual tests. The patch fixed dynamic allocation in production cluster.

Closes #32526 from VasilyKolpakov/SPARK-35391_fix_ExecutorAllocationListener.

Authored-by: Vasily Kolpakov <vasilykolpakov@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
2021-06-21 08:23:20 -05:00
Chandni Singh 8ce1e344e5 [SPARK-35671][SHUFFLE][CORE] Add support in the ESS to serve merged shuffle block meta and data to executors
### What changes were proposed in this pull request?
This adds support in the ESS to serve merged shuffle block meta and data requests to executors.
This change is needed for fetching remote merged shuffle data from the remote shuffle services. This is part of push-based shuffle SPIP [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).

This change introduces new messages between clients and the external shuffle service:

1. `MergedBlockMetaRequest`: The client sends this to external shuffle to get the meta information for a merged block. The response to this is one of these :
  - `MergedBlockMetaSuccess` : contains request id, number of chunks, and a `ManagedBuffer` which is a `FileSegmentBuffer` backed by the merged block meta file.
  - `RpcFailure`: this is sent back to client in case of failure. This is an existing message.

2. `FetchShuffleBlockChunks`: This is similar to `FetchShuffleBlocks` message but it is to fetch merged shuffle chunks instead of blocks.

### Why are the changes needed?
These changes are needed for push-based shuffle. Refer to the SPIP in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602).
We have already verified the functionality and the improved performance as documented in the SPIP doc.

Lead-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com

Closes #32811 from otterc/SPARK-35671.

Lead-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-06-20 17:22:37 -05:00
Dongjoon Hyun 4f51e0045e [SPARK-35832][CORE][ML][K8S][TESTS] Add LocalRootDirsTest trait
### What changes were proposed in this pull request?

To make the test suite more robust, this PR aims to add a new trait, `LocalRootDirsTest`, by refactoring `SortShuffleSuite`'s helper functions and applying it to the following:
- ShuffleNettySuite
- ShuffleOldFetchProtocolSuite
- ExternalShuffleServiceSuite
- KubernetesLocalDiskShuffleDataIOSuite
- LocalDirsSuite
- RDDCleanerSuite
- ALSCleanerSuite

In addition, this fixes a UT in `KubernetesLocalDiskShuffleDataIOSuite`.

### Why are the changes needed?

`ShuffleSuite` is extended by four classes but only `SortShuffleSuite` does the clean-up correctly.
```
ShuffleSuite
- SortShuffleSuite
- ShuffleNettySuite
- ShuffleOldFetchProtocolSuite
- ExternalShuffleServiceSuite
```

Since `KubernetesLocalDiskShuffleDataIOSuite` is looking for the other storage directory, the leftover of `ShuffleSuite` causes flakiness.
- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-3.2/2649/testReport/junit/org.apache.spark.shuffle/KubernetesLocalDiskShuffleDataIOSuite/recompute_is_not_blocked_by_the_recovery/
```
org.apache.spark.SparkException: Job aborted due to stage failure: task 0.0 in stage 1.0 (TID 3) had a not serializable result: org.apache.spark.ShuffleSuite$NonJavaSerializableClass
...
org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIOSuite.$anonfun$new$2(KubernetesLocalDiskShuffleDataIOSuite.scala:52)
```

For the other suites, the clean-up implementation is used but not complete. So, they are refactored to use new trait.

### Does this PR introduce _any_ user-facing change?

No, this is a test-only change.

### How was this patch tested?

Pass the CIs.

Closes #32986 from dongjoon-hyun/SPARK-35832.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-20 10:53:53 -07:00
toujours33 d015eff16d [SPARK-35796][TESTS] Fix SparkSubmitSuite failure on MacOS 10.15+
### What changes were proposed in this pull request?
Change primaryResource assertion from exact match to suffix match in case SparkSubmitSuite.`handles k8s cluster mode`

### Why are the changes needed?
When I run SparkSubmitSuite on MacOs 10.15.7, I got AssertionError for `handles k8s cluster mode` test after pr [SPARK-35691](https://issues.apache.org/jira/browse/SPARK-35691), due to `File(path).getCanonicalFile().toURI()` function  with absolute path as parameter will return path begin with `/System/Volumes/Data` on MacOs higher tha 10.15.
eg.  `/home/testjars.jar` will get `file:/System/Volumes/Data/home/testjars.jar`

In order to pass UT on MacOs higher than 10.15, we change the assertion into suffix match

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
1. Pass the GitHub Action
2. Manually test
    - environment: MacOs > 10.15
    - commad: `build/mvn clean install -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -pl core -am -DwildcardSuites=org.apache.spark.deploy.SparkSubmitSuite -Dtest=none`
    - Test result:
        - before this pr, case failed with following exception:
        `- handles k8s cluster mode *** FAILED ***
  Some("file:/System/Volumes/Data/home/thejar.jar") was not equal to Some("file:/home/thejar.jar") (SparkSubmitSuite.scala:485)
  Analysis:
  Some(value: "file:/[System/Volumes/Data/]home/thejar.jar" -> "file:/[]home/thejar.jar")`
        - after this pr, run all test successfully

Closes #32948 from toujours33/SPARK-35796.

Authored-by: toujours33 <wangyazhi@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-18 17:48:49 -07:00
HyukjinKwon 41af409b7b [SPARK-35303][PYTHON] Enable pinned thread mode by default
### What changes were proposed in this pull request?

PySpark added pinned thread mode at https://github.com/apache/spark/pull/24898 to sync Python thread to JVM thread. Previously, one JVM thread could be reused which ends up with messed inheritance hierarchy such as thread local especially when multiple jobs run in parallel. To completely fix this, we should enable this mode by default.

### Why are the changes needed?

To correctly support parallel job submission and management.

### Does this PR introduce _any_ user-facing change?

Yes, now Python thread is mapped to JVM thread one to one.

### How was this patch tested?

Existing tests should cover it.

Closes #32429 from HyukjinKwon/SPARK-35303.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-18 12:02:29 +09:00
David Christle 7fcb127674 [SPARK-35670][BUILD] Upgrade ZSTD-JNI to 1.5.0-2
### What changes were proposed in this pull request?
This PR aims to upgrade `zstd-jni` to 1.5.0-2, which uses `zstd` version 1.5.0.

### Why are the changes needed?
Major improvements to Zstd support are targeted for the upcoming 3.2.0 release of Spark. Zstd 1.5.0 introduces significant compression (+25% to 140%) and decompression (~15%) speed improvements in benchmarks described in more detail on the releases page:

- https://github.com/facebook/zstd/releases/tag/v1.5.0

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Build passes build tests, but the benchmark tests seem flaky. I am unsure if this change is responsible. The error is:
```
Running org.apache.spark.rdd.CoalescedRDDBenchmark:
21/06/08 18:53:10 ERROR SparkContext: Failed to add file:/home/runner/work/spark/spark/./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar to Spark environment
java.lang.IllegalArgumentException: requirement failed: File spark-core_2.12-3.2.0-SNAPSHOT-tests.jar was already registered with a different path (old path = /home/runner/work/spark/spark/core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar, new path = /home/runner/work/spark/spark/./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar
```

https://github.com/dchristle/spark/runs/2776123749?check_suite_focus=true

cc: dongjoon-hyun

Closes #32826 from dchristle/ZSTD150.

Lead-authored-by: David Christle <dchristle@squareup.com>
Co-authored-by: David Christle <dchristle@users.noreply.github.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-17 11:06:50 -07:00
Angerszhuuuu 79362c4efc [SPARK-34898][CORE] We should log SparkListenerExecutorMetricsUpdateEvent of driver appropriately when spark.eventLog.logStageExecutorMetrics is true
### What changes were proposed in this pull request?
In current EventLoggingListener, we won't write SparkListenerExecutorMetricsUpdate message to event log file at all

```
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
  if (shouldLogStageExecutorMetrics) {
    event.executorUpdates.foreach { case (stageKey1, newPeaks) =>
      liveStageExecutorMetrics.foreach { case (stageKey2, metricsPerExecutor) =>
        // If the update came from the driver, stageKey1 will be the dummy key (-1, -1),
        // so record those peaks for all active stages.
        // Otherwise, record the peaks for the matching stage.
        if (stageKey1 == DRIVER_STAGE_KEY || stageKey1 == stageKey2) {
          val metrics = metricsPerExecutor.getOrElseUpdate(
            event.execId, new ExecutorMetrics())
          metrics.compareAndUpdatePeakValues(newPeaks)
        }
      }
    }
  }
}
```

In history server's restful API about executor, we can get Executor's metrics but can't get all driver's metrics. Executor's executor metrics can be updated with TaskEnd event etc...

So in this pr, I add support to log SparkListenerExecutorMetricsUpdateEvent of `driver` when `spark.eventLog.logStageExecutorMetrics` is true.

### Why are the changes needed?
Make user can got driver's peakMemoryMetrics in SHS.

### Does this PR introduce _any_ user-facing change?
 user can got driver's executor metrics in SHS's restful API.

### How was this patch tested?
Mannul test

Closes #31992 from AngersZhuuuu/SPARK-34898.

Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-06-17 12:08:10 -05:00
yi.wu 509c076bc0 [SPARK-34054][CORE] BlockManagerDecommissioner code cleanup
### What changes were proposed in this pull request?

This PR cleans up the code of `BlockManagerDecommissioner`. It includes a few changes:

* Only create `BlockManagerDecommissioner` instance when shuffle or RDD blocks requires migration:
   there's no need to create `BlockManagerDecommissioner` instance if only `STORAGE_DECOMMISSION_ENABLED=true` and to check blocks migration in `shutdownThread`.

* Shut down the migration thread more gracefully:

  1. we'd better not log errors if the `BlockManagerDecommissioner.stop()` is invoked explicitly. But currently, users will see
    <details>

      <summary>error message</summary>

    ```
    21/01/04 20:11:52 ERROR BlockManagerDecommissioner: Error while waiting for block to migrate
    java.lang.InterruptedException: sleep interrupted
	    at java.lang.Thread.sleep(Native Method)
	    at org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:83)
	    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	    at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run$1(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)
	    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	    at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:68)
	    at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:54)
	    at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:101)
	    at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)
	    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	    at java.lang.Thread.run(Thread.java:748)
    ```
    </details>

   2. don't shut down a thread pool like below since `shutdown()` actually doesn't block to wait for running tasks finish:

      ```scala
      executor.shutdown()
      executor.shutdownNow()
      ```

* Avoid initiating `shuffleMigrationPool` when it's unnecessary:
 Currently, it's always initiated even if shuffle block migration is disabled. (`BlockManagerDecommissioner.stop()` -> `stopOffloadingShuffleBlocks()` -> initiate `shuffleMigrationPool`)

* Unify the terminologies between `offload` and `migrate`:
  replace `offload` with `migrate`

* Do not add back the shuffle blocks when it exceeds the max failure number:
   this avoids unnecessary operations

* Do not try `decommissionRddCacheBlocks()` if we already know there are no available peers

* Clean up logs:
   Currently, we have many different description for the same thing, which is not good for the user experience

* Other cleanups

### Why are the changes needed?

code clean up

### Does this PR introduce _any_ user-facing change?

Yes, users will not see misleading logs, e.g., the interrupted error.

### How was this patch tested?

Update a unite test since we change the behavior of creating the `BlockManagerDecommissioner` instance.

Other changes are only code cleanup so they won't cause behaviour change. So passing the existing tests should be enough.

Closes #31102 from Ngone51/stop-decommission-gracefully.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-17 15:00:18 +00:00
Venkata krishnan Sowrirajan aaa8a80c9d [SPARK-35613][CORE][SQL] Cache commonly occurring strings in SQLMetrics, JSONProtocol and AccumulatorV2 classes
### What changes were proposed in this pull request?
Cache commonly occurring duplicate Some objects in SQLMetrics by using a Guava cache and reusing the existing Guava String Interner to avoid duplicate strings in JSONProtocol. Also with AccumulatorV2 we have seen lot of Some(-1L) and Some(0L) occurrences in a heap dump that is naively interned by having reusing a already constructed Some(-1L) and Some(0L)

To give some context on the impact and the garbage got accumulated, below are the details of the complex spark job which we troubleshooted and figured out the bottlenecks. **tl;dr - In short, major issues were the accumulation of duplicate objects mainly from SQLMetrics.**

Greater than 25% of the 40G driver heap filled with (a very large number of) **duplicate**, immutable objects.

1. Very large number of **duplicate** immutable objects.

- Type of metric is represented by `'scala.Some("sql")'` - which is created for each metric.
- Fixing this reduced memory usage from 4GB to a few bytes.

2. `scala.Some(0)` and `scala.Some(-1)` are very common metric values (typically to indicate absence of metric)

- Individually the values are all immutable, but spark sql was creating a new instance each time.
- Intern'ing these resulted in saving ~4.5GB for a 40G heap.

3. Using string interpolation for metric names.

- Interpolation results in creation of a new string object.
- We end up with a very large number of metric names - though the number of unique strings is miniscule.
- ~7.5 GB in the 40 GB heap : which went down to a few KB's when fixed.

### Why are the changes needed?
To reduce overall driver memory footprint which eventually reduces the Full GC pauses.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Since these are memory related optimizations, unit tests are not added. These changes are added in our internal platform which made it possible for one of the complex spark job continuously failing to succeed along with other set of optimizations.

Closes #32754 from venkata91/SPARK-35613.

Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-06-15 22:02:19 -05:00
Kevin Su ac228d43bc [SPARK-35691][CORE] addFile/addJar/addDirectory should put CanonicalFile
### What changes were proposed in this pull request?

`addFile/addJar/addDirectory` should put CanonicalFile

### Why are the changes needed?

I met the error below.

21/06/07 00:06:57 ERROR SparkContext: Failed to add file:/home/runner/work/spark/spark/./core/target/scala-2.12/spark-
core_2.12-3.2.0-SNAPSHOT-tests.jar to Spark environment
java.lang.IllegalArgumentException: requirement failed: File spark-core_2.12-3.2.0-SNAPSHOT-tests.jar was already registered with a different path (old path = /home/runner/work/spark/spark/core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar, new path = /home/runner/work/spark/spark/./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar But actually, /home/runner/work/spark/spark/./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar* and * /*home/runner/work/spark/spark/core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar are the same*.

But actually, `/home/runner/work/spark/spark/./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar`and `/home/runner/work/spark/spark/core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar` are the same.

I think we should put the Canonical File in ConcurrentHashMap.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass the CIs.

Closes #32845 from pingsutw/SPARK-35691.

Authored-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2021-06-16 09:39:37 +09:00
Kun Wan 69aa7ad11f [SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown
### What changes were proposed in this pull request?

Bug fix for deadlock during the executor shutdown

### Why are the changes needed?

When a executor received a TERM signal, it (the second TERM signal) will lock java.lang.Shutdown class and then call Shutdown.exit() method to exit the JVM.
Shutdown will call SparkShutdownHook to shutdown the executor.
During the executor shutdown phase, RemoteProcessDisconnected event will be send to the RPC inbox, and then WorkerWatcher will try to call System.exit(-1) again.
Because java.lang.Shutdown has already locked, a deadlock has occurred.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Test case "task reaper kills JVM if killed tasks keep running for too long" in JobCancellationSuite

Closes #32868 from wankunde/SPARK-35714.

Authored-by: Kun Wan <wankun@apache.org>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-06-13 16:01:00 -05:00
shahid 450b415028 [SPARK-35746][UI] Fix taskid in the stage page task event timeline
### What changes were proposed in this pull request?
Task id is given incorrect in the timeline plot in Stage Page

### Why are the changes needed?
Map event timeline plots to correct task
**Before:**
![image](https://user-images.githubusercontent.com/23054875/121761077-81775800-cb4b-11eb-8ec6-ee71926a6549.png)

**After**
![image](https://user-images.githubusercontent.com/23054875/121761195-02ceea80-cb4c-11eb-8ce6-07bb1cca190e.png)
### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually tested

Closes #32888 from shahidki31/shahid/fixtaskid.

Authored-by: shahid <shahidki31@gmail.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2021-06-12 15:38:41 +09:00
Dongjoon Hyun cf07036d9b [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs
### What changes were proposed in this pull request?

Previously, the following two commits allow driver-owned on-demand PVC reuse.
- SPARK-35182 Support driver-owned on-demand PVC
- SPARK-35416 Support PersistentVolumeClaim Reuse

This PR aims to recover the shuffle data on those remounted PVCs. The lifecycle of PVCs are tied to the one of Spark jobs. Since this is K8s specific feature, `ShuffleDataIO` plugin is used.

### Why are the changes needed?

Although Pod is killed, we can remount PVCs and recover some data from it.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the newly added test cases.

Closes #32730 from dongjoon-hyun/SPARK-RECOVER-SHUFFLE-DATA.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-10 16:06:58 -07:00
Ye Zhou a97885bb2c [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data
### What changes were proposed in this pull request?
This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle.

### Summary of changes:
Executor will create the merge directories under the application temp directory provided by YARN. The access control of the folder will be set to 770, where Shuffle Service can create merged shuffle files and write merge shuffle data in to those files.

Serve the merged shuffle blocks fetch request, read the merged shuffle blocks.

### Why are the changes needed?
Refer to the SPIP in SPARK-30602.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as documented in the SPIP doc.

Lead-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Ye Zhou yezhoulinkedin.com

Closes #32007 from zhouyejoe/SPARK-33350.

Lead-authored-by: Ye Zhou <yezhou@linkedin.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-06-10 16:57:46 -05:00
Venkata krishnan Sowrirajan b5a1503585 [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage
### What changes were proposed in this pull request?

Summary of the changes made as part of this PR:

1. `DAGScheduler` changes to finalize a ShuffleMapStage which involves talking to all the shuffle mergers (`ExternalShuffleService`) and getting all the completed merge statuses.
2. Once the `ShuffleMapStage` finalization is complete, mark the `ShuffleMapStage` to be finalized which marks the stage as complete and subsequently letting the child stage start.
3. Also added the relevant tests to `DAGSchedulerSuite` for changes made as part of [SPARK-32919](https://issues.apache.org/jira/browse/SPARK-32919)

Lead-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Venkata krishnan Sowrirajan vsowrirajanlinkedin.com
Co-authored-by: Chandni Singh chsinghlinkedin.com

### Why are the changes needed?

Refer to [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit tests to DAGSchedulerSuite

Closes #30691 from venkata91/SPARK-32920.

Lead-authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-06-10 13:06:15 -05:00
Wenchen Fan 224ebae273 [SPARK-35661][SQL] Allow deserialized off-heap memory entry
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/32534

#32534 proposed a use case to use `DeserializedMemoryEntry` to store off-heap data, and let Spark release the memory via the `AutoCloseable` interface. However, there is one more problem: `DeserializedMemoryEntry` always reports its size as on-heap size, which is inaccurate. If the Spark cluster is configured with small on-heap size and large off-heap size, this will trigger a lot of spilling.

This PR makes `DeserializedMemoryEntry` truly support off-heap data. Now the caller side can cache off-heap data with a new storage level `OFF_HEAP_ONLY_DESER`.

### Why are the changes needed?

correct the memory counting for off-heap data.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

updated test

Closes #32800 from cloud-fan/follow.

Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-09 14:01:12 +00:00
Yuanjian Li 9f010a8eb2 [SPARK-35436][SS] RocksDBFileManager - save checkpoint to DFS
### What changes were proposed in this pull request?
The implementation for the save operation of RocksDBFileManager.

### Why are the changes needed?
Save all the files in the given local checkpoint directory as a committed version in DFS.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
New UT added.

Closes #32582 from xuanyuanking/SPARK-35436.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-06-09 14:09:28 +09:00
dgd-contributor 6c3b7f92cf [SPARK-35074][CORE] hardcoded configs move to config package
### What changes were proposed in this pull request?
Currently spark.jars.xxx property keys (e.g. spark.jars.ivySettings and spark.jars.packages) are hardcoded in multiple places within Spark code across multiple modules. We should define them in config/package.scala and reference them in all other places.

### Why are the changes needed?
improvement

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
no

Closes #32746 from dgd-contributor/SPARK-35074_configs_should_be_moved_to_config_package.scala.

Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Thomas Graves <tgraves@apache.org>
2021-06-07 09:55:03 -05:00
attilapiros 4534c0c4df [SPARK-35543][CORE] Fix memory leak in BlockManagerMasterEndpoint removeRdd
### What changes were proposed in this pull request?

In `BlockManagerMasterEndpoint` for the disk persisted RDDs (when `spark.shuffle.service.fetch.rdd.enable` is enabled) we are keeping track the block status entries by external shuffle service instances (so on YARN we are basically keeping them by nodes). This is the `blockStatusByShuffleService` member val. And when all the RDD blocks are removed for one external shuffle service instance then the key and the empty map can be removed from `blockStatusByShuffleService`.

### Why are the changes needed?

It is a small leak and I was asked to take care of it in https://github.com/apache/spark/pull/32114#discussion_r640270377.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually by adding a temporary log line to check `blockStatusByShuffleService` value before and after the `removeRdd` and run the `SPARK-25888: using external shuffle service fetching disk persisted blocks` test in `ExternalShuffleServiceSuite`.

Closes #32790 from attilapiros/SPARK-35543.

Authored-by: attilapiros <piros.attila.zsolt@gmail.com>
Signed-off-by: attilapiros <piros.attila.zsolt@gmail.com>
2021-06-07 15:37:19 +02:00
Dongjoon Hyun d4e32c896a [SPARK-35654][CORE] Allow ShuffleDataIO control DiskBlockManager.deleteFilesOnStop
### What changes were proposed in this pull request?

This PR aims to change `DiskBlockManager` like the following to allow `ShuffleDataIO` to decide the behavior of shuffle file deletion.
```scala
- private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean)
+ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Boolean)
```

### Why are the changes needed?

`SparkContext` creates
1. `SparkEnv` (with `BlockManager` and its `DiskBlockManager`)
2. loads `ShuffleDataIO`
3. initialize block manager.
```scala
_env = createSparkEnv(_conf, isLocal, listenerBus)

...
_shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
    _shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) =>
      _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
    }
...

_env.blockManager.initialize(_applicationId)
...
```

`DiskBlockManager` is created first at `BlockManager` constructor and we cannot change `deleteFilesOnStop` later at `ShuffleDataIO`. By switching to `var`, we can implement enhanced shuffle data management feature via `ShuffleDataIO` like https://github.com/apache/spark/pull/32730 .
```
  val diskBlockManager = {
    // Only perform cleanup if an external service is not serving our shuffle files.
    val deleteFilesOnStop =
      !externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
    new DiskBlockManager(conf, deleteFilesOnStop)
  }
```

### Does this PR introduce _any_ user-facing change?

No. This is a private class.

### How was this patch tested?

N/A

Closes #32784 from dongjoon-hyun/SPARK-35654.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-06 09:20:42 -07:00
Wenchen Fan 63ab38f917 [SPARK-35396][CORE][FOLLOWUP] Free memory entry immediately
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/32534 , and proposes to free the memory entry immediately instead of doing it in the backround asynchronously. The reason is:
1. It's a bit weird to free the resource in an asynchronous way.
2. We free the off-heap memory entry in the same thread, and it's better to be consistent with it.
3. We can simplify the code quite a bit.

This PR also simplifies the tests to reuse the class definition.

### Why are the changes needed?

code simplification

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing tests

Closes #32743 from cloud-fan/follow.

Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-03 21:54:27 -07:00