### What changes were proposed in this pull request?
Change primaryResource assertion from exact match to suffix match in case SparkSubmitSuite.`handles k8s cluster mode`
### Why are the changes needed?
When I run SparkSubmitSuite on MacOs 10.15.7, I got AssertionError for `handles k8s cluster mode` test after pr [SPARK-35691](https://issues.apache.org/jira/browse/SPARK-35691), due to `File(path).getCanonicalFile().toURI()` function with absolute path as parameter will return path begin with `/System/Volumes/Data` on MacOs higher tha 10.15.
eg. `/home/testjars.jar` will get `file:/System/Volumes/Data/home/testjars.jar`
In order to pass UT on MacOs higher than 10.15, we change the assertion into suffix match
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
1. Pass the GitHub Action
2. Manually test
- environment: MacOs > 10.15
- commad: `build/mvn clean install -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -pl core -am -DwildcardSuites=org.apache.spark.deploy.SparkSubmitSuite -Dtest=none`
- Test result:
- before this pr, case failed with following exception:
`- handles k8s cluster mode *** FAILED ***
Some("file:/System/Volumes/Data/home/thejar.jar") was not equal to Some("file:/home/thejar.jar") (SparkSubmitSuite.scala:485)
Analysis:
Some(value: "file:/[System/Volumes/Data/]home/thejar.jar" -> "file:/[]home/thejar.jar")`
- after this pr, run all test successfully
Closes#32948 from toujours33/SPARK-35796.
Authored-by: toujours33 <wangyazhi@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
PySpark added pinned thread mode at https://github.com/apache/spark/pull/24898 to sync Python thread to JVM thread. Previously, one JVM thread could be reused which ends up with messed inheritance hierarchy such as thread local especially when multiple jobs run in parallel. To completely fix this, we should enable this mode by default.
### Why are the changes needed?
To correctly support parallel job submission and management.
### Does this PR introduce _any_ user-facing change?
Yes, now Python thread is mapped to JVM thread one to one.
### How was this patch tested?
Existing tests should cover it.
Closes#32429 from HyukjinKwon/SPARK-35303.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR aims to upgrade `zstd-jni` to 1.5.0-2, which uses `zstd` version 1.5.0.
### Why are the changes needed?
Major improvements to Zstd support are targeted for the upcoming 3.2.0 release of Spark. Zstd 1.5.0 introduces significant compression (+25% to 140%) and decompression (~15%) speed improvements in benchmarks described in more detail on the releases page:
- https://github.com/facebook/zstd/releases/tag/v1.5.0
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Build passes build tests, but the benchmark tests seem flaky. I am unsure if this change is responsible. The error is:
```
Running org.apache.spark.rdd.CoalescedRDDBenchmark:
21/06/08 18:53:10 ERROR SparkContext: Failed to add file:/home/runner/work/spark/spark/./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar to Spark environment
java.lang.IllegalArgumentException: requirement failed: File spark-core_2.12-3.2.0-SNAPSHOT-tests.jar was already registered with a different path (old path = /home/runner/work/spark/spark/core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar, new path = /home/runner/work/spark/spark/./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar
```
https://github.com/dchristle/spark/runs/2776123749?check_suite_focus=true
cc: dongjoon-hyun
Closes#32826 from dchristle/ZSTD150.
Lead-authored-by: David Christle <dchristle@squareup.com>
Co-authored-by: David Christle <dchristle@users.noreply.github.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
In current EventLoggingListener, we won't write SparkListenerExecutorMetricsUpdate message to event log file at all
```
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
if (shouldLogStageExecutorMetrics) {
event.executorUpdates.foreach { case (stageKey1, newPeaks) =>
liveStageExecutorMetrics.foreach { case (stageKey2, metricsPerExecutor) =>
// If the update came from the driver, stageKey1 will be the dummy key (-1, -1),
// so record those peaks for all active stages.
// Otherwise, record the peaks for the matching stage.
if (stageKey1 == DRIVER_STAGE_KEY || stageKey1 == stageKey2) {
val metrics = metricsPerExecutor.getOrElseUpdate(
event.execId, new ExecutorMetrics())
metrics.compareAndUpdatePeakValues(newPeaks)
}
}
}
}
}
```
In history server's restful API about executor, we can get Executor's metrics but can't get all driver's metrics. Executor's executor metrics can be updated with TaskEnd event etc...
So in this pr, I add support to log SparkListenerExecutorMetricsUpdateEvent of `driver` when `spark.eventLog.logStageExecutorMetrics` is true.
### Why are the changes needed?
Make user can got driver's peakMemoryMetrics in SHS.
### Does this PR introduce _any_ user-facing change?
user can got driver's executor metrics in SHS's restful API.
### How was this patch tested?
Mannul test
Closes#31992 from AngersZhuuuu/SPARK-34898.
Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
This PR cleans up the code of `BlockManagerDecommissioner`. It includes a few changes:
* Only create `BlockManagerDecommissioner` instance when shuffle or RDD blocks requires migration:
there's no need to create `BlockManagerDecommissioner` instance if only `STORAGE_DECOMMISSION_ENABLED=true` and to check blocks migration in `shutdownThread`.
* Shut down the migration thread more gracefully:
1. we'd better not log errors if the `BlockManagerDecommissioner.stop()` is invoked explicitly. But currently, users will see
<details>
<summary>error message</summary>
```
21/01/04 20:11:52 ERROR BlockManagerDecommissioner: Error while waiting for block to migrate
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:83)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run$1(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:68)
at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:54)
at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:101)
at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
</details>
2. don't shut down a thread pool like below since `shutdown()` actually doesn't block to wait for running tasks finish:
```scala
executor.shutdown()
executor.shutdownNow()
```
* Avoid initiating `shuffleMigrationPool` when it's unnecessary:
Currently, it's always initiated even if shuffle block migration is disabled. (`BlockManagerDecommissioner.stop()` -> `stopOffloadingShuffleBlocks()` -> initiate `shuffleMigrationPool`)
* Unify the terminologies between `offload` and `migrate`:
replace `offload` with `migrate`
* Do not add back the shuffle blocks when it exceeds the max failure number:
this avoids unnecessary operations
* Do not try `decommissionRddCacheBlocks()` if we already know there are no available peers
* Clean up logs:
Currently, we have many different description for the same thing, which is not good for the user experience
* Other cleanups
### Why are the changes needed?
code clean up
### Does this PR introduce _any_ user-facing change?
Yes, users will not see misleading logs, e.g., the interrupted error.
### How was this patch tested?
Update a unite test since we change the behavior of creating the `BlockManagerDecommissioner` instance.
Other changes are only code cleanup so they won't cause behaviour change. So passing the existing tests should be enough.
Closes#31102 from Ngone51/stop-decommission-gracefully.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Cache commonly occurring duplicate Some objects in SQLMetrics by using a Guava cache and reusing the existing Guava String Interner to avoid duplicate strings in JSONProtocol. Also with AccumulatorV2 we have seen lot of Some(-1L) and Some(0L) occurrences in a heap dump that is naively interned by having reusing a already constructed Some(-1L) and Some(0L)
To give some context on the impact and the garbage got accumulated, below are the details of the complex spark job which we troubleshooted and figured out the bottlenecks. **tl;dr - In short, major issues were the accumulation of duplicate objects mainly from SQLMetrics.**
Greater than 25% of the 40G driver heap filled with (a very large number of) **duplicate**, immutable objects.
1. Very large number of **duplicate** immutable objects.
- Type of metric is represented by `'scala.Some("sql")'` - which is created for each metric.
- Fixing this reduced memory usage from 4GB to a few bytes.
2. `scala.Some(0)` and `scala.Some(-1)` are very common metric values (typically to indicate absence of metric)
- Individually the values are all immutable, but spark sql was creating a new instance each time.
- Intern'ing these resulted in saving ~4.5GB for a 40G heap.
3. Using string interpolation for metric names.
- Interpolation results in creation of a new string object.
- We end up with a very large number of metric names - though the number of unique strings is miniscule.
- ~7.5 GB in the 40 GB heap : which went down to a few KB's when fixed.
### Why are the changes needed?
To reduce overall driver memory footprint which eventually reduces the Full GC pauses.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Since these are memory related optimizations, unit tests are not added. These changes are added in our internal platform which made it possible for one of the complex spark job continuously failing to succeed along with other set of optimizations.
Closes#32754 from venkata91/SPARK-35613.
Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
`addFile/addJar/addDirectory` should put CanonicalFile
### Why are the changes needed?
I met the error below.
21/06/07 00:06:57 ERROR SparkContext: Failed to add file:/home/runner/work/spark/spark/./core/target/scala-2.12/spark-
core_2.12-3.2.0-SNAPSHOT-tests.jar to Spark environment
java.lang.IllegalArgumentException: requirement failed: File spark-core_2.12-3.2.0-SNAPSHOT-tests.jar was already registered with a different path (old path = /home/runner/work/spark/spark/core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar, new path = /home/runner/work/spark/spark/./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar But actually, /home/runner/work/spark/spark/./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar* and * /*home/runner/work/spark/spark/core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar are the same*.
But actually, `/home/runner/work/spark/spark/./core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar`and `/home/runner/work/spark/spark/core/target/scala-2.12/spark-core_2.12-3.2.0-SNAPSHOT-tests.jar` are the same.
I think we should put the Canonical File in ConcurrentHashMap.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass the CIs.
Closes#32845 from pingsutw/SPARK-35691.
Authored-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
### What changes were proposed in this pull request?
Bug fix for deadlock during the executor shutdown
### Why are the changes needed?
When a executor received a TERM signal, it (the second TERM signal) will lock java.lang.Shutdown class and then call Shutdown.exit() method to exit the JVM.
Shutdown will call SparkShutdownHook to shutdown the executor.
During the executor shutdown phase, RemoteProcessDisconnected event will be send to the RPC inbox, and then WorkerWatcher will try to call System.exit(-1) again.
Because java.lang.Shutdown has already locked, a deadlock has occurred.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Test case "task reaper kills JVM if killed tasks keep running for too long" in JobCancellationSuite
Closes#32868 from wankunde/SPARK-35714.
Authored-by: Kun Wan <wankun@apache.org>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
Previously, the following two commits allow driver-owned on-demand PVC reuse.
- SPARK-35182 Support driver-owned on-demand PVC
- SPARK-35416 Support PersistentVolumeClaim Reuse
This PR aims to recover the shuffle data on those remounted PVCs. The lifecycle of PVCs are tied to the one of Spark jobs. Since this is K8s specific feature, `ShuffleDataIO` plugin is used.
### Why are the changes needed?
Although Pod is killed, we can remount PVCs and recover some data from it.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the newly added test cases.
Closes#32730 from dongjoon-hyun/SPARK-RECOVER-SHUFFLE-DATA.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle.
### Summary of changes:
Executor will create the merge directories under the application temp directory provided by YARN. The access control of the folder will be set to 770, where Shuffle Service can create merged shuffle files and write merge shuffle data in to those files.
Serve the merged shuffle blocks fetch request, read the merged shuffle blocks.
### Why are the changes needed?
Refer to the SPIP in SPARK-30602.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as documented in the SPIP doc.
Lead-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Ye Zhou yezhoulinkedin.com
Closes#32007 from zhouyejoe/SPARK-33350.
Lead-authored-by: Ye Zhou <yezhou@linkedin.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
Summary of the changes made as part of this PR:
1. `DAGScheduler` changes to finalize a ShuffleMapStage which involves talking to all the shuffle mergers (`ExternalShuffleService`) and getting all the completed merge statuses.
2. Once the `ShuffleMapStage` finalization is complete, mark the `ShuffleMapStage` to be finalized which marks the stage as complete and subsequently letting the child stage start.
3. Also added the relevant tests to `DAGSchedulerSuite` for changes made as part of [SPARK-32919](https://issues.apache.org/jira/browse/SPARK-32919)
Lead-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Venkata krishnan Sowrirajan vsowrirajanlinkedin.com
Co-authored-by: Chandni Singh chsinghlinkedin.com
### Why are the changes needed?
Refer to [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602)
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit tests to DAGSchedulerSuite
Closes#30691 from venkata91/SPARK-32920.
Lead-authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/32534#32534 proposed a use case to use `DeserializedMemoryEntry` to store off-heap data, and let Spark release the memory via the `AutoCloseable` interface. However, there is one more problem: `DeserializedMemoryEntry` always reports its size as on-heap size, which is inaccurate. If the Spark cluster is configured with small on-heap size and large off-heap size, this will trigger a lot of spilling.
This PR makes `DeserializedMemoryEntry` truly support off-heap data. Now the caller side can cache off-heap data with a new storage level `OFF_HEAP_ONLY_DESER`.
### Why are the changes needed?
correct the memory counting for off-heap data.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
updated test
Closes#32800 from cloud-fan/follow.
Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
The implementation for the save operation of RocksDBFileManager.
### Why are the changes needed?
Save all the files in the given local checkpoint directory as a committed version in DFS.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New UT added.
Closes#32582 from xuanyuanking/SPARK-35436.
Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
Currently spark.jars.xxx property keys (e.g. spark.jars.ivySettings and spark.jars.packages) are hardcoded in multiple places within Spark code across multiple modules. We should define them in config/package.scala and reference them in all other places.
### Why are the changes needed?
improvement
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
no
Closes#32746 from dgd-contributor/SPARK-35074_configs_should_be_moved_to_config_package.scala.
Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Thomas Graves <tgraves@apache.org>
### What changes were proposed in this pull request?
In `BlockManagerMasterEndpoint` for the disk persisted RDDs (when `spark.shuffle.service.fetch.rdd.enable` is enabled) we are keeping track the block status entries by external shuffle service instances (so on YARN we are basically keeping them by nodes). This is the `blockStatusByShuffleService` member val. And when all the RDD blocks are removed for one external shuffle service instance then the key and the empty map can be removed from `blockStatusByShuffleService`.
### Why are the changes needed?
It is a small leak and I was asked to take care of it in https://github.com/apache/spark/pull/32114#discussion_r640270377.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually by adding a temporary log line to check `blockStatusByShuffleService` value before and after the `removeRdd` and run the `SPARK-25888: using external shuffle service fetching disk persisted blocks` test in `ExternalShuffleServiceSuite`.
Closes#32790 from attilapiros/SPARK-35543.
Authored-by: attilapiros <piros.attila.zsolt@gmail.com>
Signed-off-by: attilapiros <piros.attila.zsolt@gmail.com>
### What changes were proposed in this pull request?
This PR aims to change `DiskBlockManager` like the following to allow `ShuffleDataIO` to decide the behavior of shuffle file deletion.
```scala
- private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean)
+ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Boolean)
```
### Why are the changes needed?
`SparkContext` creates
1. `SparkEnv` (with `BlockManager` and its `DiskBlockManager`)
2. loads `ShuffleDataIO`
3. initialize block manager.
```scala
_env = createSparkEnv(_conf, isLocal, listenerBus)
...
_shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
_shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) =>
_conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
}
...
_env.blockManager.initialize(_applicationId)
...
```
`DiskBlockManager` is created first at `BlockManager` constructor and we cannot change `deleteFilesOnStop` later at `ShuffleDataIO`. By switching to `var`, we can implement enhanced shuffle data management feature via `ShuffleDataIO` like https://github.com/apache/spark/pull/32730 .
```
val diskBlockManager = {
// Only perform cleanup if an external service is not serving our shuffle files.
val deleteFilesOnStop =
!externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
new DiskBlockManager(conf, deleteFilesOnStop)
}
```
### Does this PR introduce _any_ user-facing change?
No. This is a private class.
### How was this patch tested?
N/A
Closes#32784 from dongjoon-hyun/SPARK-35654.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/32534 , and proposes to free the memory entry immediately instead of doing it in the backround asynchronously. The reason is:
1. It's a bit weird to free the resource in an asynchronous way.
2. We free the off-heap memory entry in the same thread, and it's better to be consistent with it.
3. We can simplify the code quite a bit.
This PR also simplifies the tests to reuse the class definition.
### Why are the changes needed?
code simplification
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing tests
Closes#32743 from cloud-fan/follow.
Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This removes the accidental duplicated test coverage.
### Why are the changes needed?
To save the test resources.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
N/A because this is a removal of the duplicated test coverage.
Closes#32774 from dongjoon-hyun/SPARK-35589-3.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This patch proposes a fix to prevent triggering BlockManager reregistration while `StopExecutor` msg is in-flight.
Here on receiving `StopExecutor` msg, we do not remove the corresponding `BlockManagerInfo` from `blockManagerInfo` map, instead we mark it as dead by updating the corresponding `executorRemovalTs`. There's a separate cleanup thread running to periodically remove the stale `BlockManagerInfo` from `blockManangerInfo` map.
Now if a recently removed `BlockManager` tries to register, the driver simply ignores it since the `blockManagerInfo` map already contains an entry for it. The same applies to `BlockManagerHeartbeat`, if the BlockManager belongs to a recently removed executor, the `blockManagerInfo` map would contain an entry and we shall not ask the corresponding `BlockManager` to re-register.
### Why are the changes needed?
This changes are needed since BlockManager reregistration while executor is shutting down causes inconsistent bookkeeping of executors in Spark.
Consider the following scenario:
- `CoarseGrainedSchedulerBackend` issues async `StopExecutor` on executorEndpoint
- `CoarseGrainedSchedulerBackend` removes that executor from Driver's internal data structures and publishes `SparkListenerExecutorRemoved` on the `listenerBus`.
- Executor has still not processed `StopExecutor` from the Driver
- Driver receives heartbeat from the Executor, since it cannot find the `executorId` in its data structures, it responds with `HeartbeatResponse(reregisterBlockManager = true)`
- `BlockManager` on the Executor reregisters with the `BlockManagerMaster` and `SparkListenerBlockManagerAdded` is published on the `listenerBus`
- Executor starts processing the `StopExecutor` and exits
- `AppStatusListener` picks the `SparkListenerBlockManagerAdded` event and updates `AppStatusStore`
- `statusTracker.getExecutorInfos` refers `AppStatusStore` to get the list of executors which returns the dead executor as alive.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Modified the existing unittests.
- Ran a simple test application on minikube that asserts on number of executors are zero once the executor idle timeout is reached.
Closes#32114 from sumeetgajjar/SPARK-35011.
Authored-by: Sumeet Gajjar <sumeetgajjar93@gmail.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
### What changes were proposed in this pull request?
A test case of AdaptiveQueryExecSuite becomes flaky since there are too many debug logs in RootLogger:
https://github.com/Yikun/spark/runs/2715222392?check_suite_focus=truehttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139125/testReport/
To fix it, I suggest supporting multiple loggers in the testing method withLogAppender. So that the LogAppender gets clean target log outputs.
### Why are the changes needed?
Fix a flaky test case.
Also, reduce unnecessary memory cost in tests.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test
Closes#32725 from gengliangwang/fixFlakyLogAppender.
Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
This PR aims to make `BlockManagerMasterEndpoint.updateBlockInfo` not to ignore index-only shuffle files.
In addition, this PR fixes `IndexShuffleBlockResolver.getMigrationBlocks` to return data files first.
### Why are the changes needed?
When [SPARK-20629](a4ca355af8) introduced a worker decommission, index-only shuffle files are not considered properly.
- SPARK-33198 fixed `getMigrationBlocks` to handle index only shuffle files
- SPARK-35589 (this) aims to fix `updateBlockInfo` to handle index only shuffle files.
### Does this PR introduce _any_ user-facing change?
No. This is a bug fix.
### How was this patch tested?
Pass the CIs with the newly added test case.
Closes#32727 from dongjoon-hyun/SPARK-UPDATE-OUTPUT.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
For different UIs, e.g. History Server or Spark Live UI, maybe need different capabilities to handle HTTP requests. Usually, a History Server is for multi-users and needs more threads to increase concurrency, while Live UI is per application, which needn't that large pool size.
In this PR, we increase the max pool size of the History Server's jetty backend
### Why are the changes needed?
increase the client concurrency of HistoryServer
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
new tests
Closes#32539 from yaooqinn/SPARK-35402.
Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
### What changes were proposed in this pull request?
Fixed tooltip for "Storage" tab in UI
### Why are the changes needed?
Tooltip correction was needed
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Manually tested
Closes#32664 from lidiyag/storagewebui.
Authored-by: lidiyag <lidiya.nixon@huawei.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
### What changes were proposed in this pull request?
```
- Upload multi stages *** FAILED ***
{{ The code passed to eventually never returned normally. Attempted 20 times over 10.011176743 seconds. Last failure message: fallbackStorage.exists(0, file) was false. (FallbackStorageSuite.scala:243)}}
```
The error like above was raised in aarch64 randomly and also in github action test[1][2].
[1] https://github.com/apache/spark/actions/runs/489319612
[2]https://github.com/apache/spark/actions/runs/479317320
### Why are the changes needed?
timeout is too short, need to increase to let test case complete.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
build/mvn test -Dtest=none -DwildcardSuites=org.apache.spark.storage.FallbackStorageSuite -pl :spark-core_2.12
Closes#32719 from Yikun/SPARK-35584.
Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
After SPARK-29291 and SPARK-33352, there are still some compilation warnings about `procedure syntax is deprecated` as follows:
```
[WARNING] [Warn] /spark/core/src/main/scala/org/apache/spark/MapOutputTracker.scala:723: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `registerMergeResult`'s return type
[WARNING] [Warn] /spark/core/src/main/scala/org/apache/spark/MapOutputTracker.scala:748: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `unregisterMergeResult`'s return type
[WARNING] [Warn] /spark/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala:223: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `testSimpleSpillingForAllCodecs`'s return type
[WARNING] [Warn] /spark/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASBenchmark.scala:53: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `runBLASBenchmark`'s return type
[WARNING] [Warn] /spark/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala:110: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `assertEmptyRootPath`'s return type
[WARNING] [Warn] /spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala:602: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `executeCTASWithNonEmptyLocation`'s return type
```
So the main change of this pr is cleanup these compilation warnings.
### Why are the changes needed?
Eliminate compilation warnings in Scala 2.13 and this change should be compatible with Scala 2.12
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass the Jenkins or GitHub Action
Closes#32669 from LuciferYang/re-clean-procedure-syntax.
Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR aims to upgrade json4s from 3.7.0-M5 to 3.7.0-M11
Note: json4s version greater than 3.7.0-M11 is not binary compatible with Spark third party jars
### Why are the changes needed?
Multiple defect fixes and improvements like
https://github.com/json4s/json4s/issues/750https://github.com/json4s/json4s/issues/554https://github.com/json4s/json4s/issues/715
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Ran with the existing UTs
Closes#32636 from vinodkc/br_build_upgrade_json4s.
Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
This PR is proposing a add-on to support to manual close entries in MemoryStore and InMemoryRelation
### What changes were proposed in this pull request?
Currently:
MemoryStore uses a LinkedHashMap[BlockId, MemoryEntry[_]] to store all OnHeap or OffHeap entries.
And when memoryStore.remove(blockId) is called, codes will simply remove one entry from LinkedHashMap and leverage Java GC to do release work.
This PR:
We are proposing a add-on to manually close any object stored in MemoryStore and InMemoryRelation if this object is extended from AutoCloseable.
Veifiication:
In our own use case, we implemented a user-defined off-heap-hashRelation for BHJ, and we verified that by adding this manual close, we can make sure our defined off-heap-hashRelation can be released when evict is called.
Also, we implemented user-defined cachedBatch and will be release when InMemoryRelation.clearCache() is called by this PR
### Why are the changes needed?
This changes can help to clean some off-heap user-defined object may be cached in InMemoryRelation or MemoryStore
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
WIP
Signed-off-by: Chendi Xue <chendi.xueintel.com>
Closes#32534 from xuechendi/support_manual_close_in_memorystore.
Authored-by: Chendi Xue <chendi.xue@intel.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
When a memory reservation triggers a self-spill, `ExecutionMemoryPool#releaseMemory()` will immediately notify waiting tasks that memory has been freed. If there are any waiting tasks with less than 1/2N of the memory pool, they may acquire the newly-freed memory before the current task has a chance to do so. This will cause the original memory reservation to fail. If the initial spill did not release all available memory, the reservation could have been satisfied by asking it to spill again.
This PR adds logic to TaskMemoryManager to detect this case and retry.
### Why are the changes needed?
This bug affects queries with a MemoryConsumer that can spill part of its memory, such as BytesToBytesMap. If the MemoryConsumer is using all available memory and there is a waiting task, then attempting to acquire more memory on the MemoryConsumer will trigger a partial self-spill. However, because the waiting task gets priority, the attempt to acquire memory will fail even if it could have been satisfied by another spill.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a test to MemoryManagerSuite that previously failed and now passes.
Closes#32625 from ankurdave/SPARK-35486.
Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
### What changes were proposed in this pull request?
Print the invalid value in config validation error message for `checkValue` just like `checkValues`
### Why are the changes needed?
Invalid configuration values may come in many ways, this PR can help different kinds of users or developers to identify what the config the error is related to
### Does this PR introduce _any_ user-facing change?
yes, but only error msg
### How was this patch tested?
yes, modified tests
Closes#32600 from yaooqinn/SPARK-35456.
Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR proposes a workaround to address the Netty OOM issue (SPARK-24989, SPARK-27991):
Basically, `ShuffleBlockFetcherIterator` would catch the `OutOfDirectMemoryError` from Netty and then set a global flag for the shuffle module. Any pending fetch requests would be deferred if there're in-flight requests until the flag is unset. And the flag will be unset when there's a fetch request succeed.
Note that catching the Netty OOM rather than abort the application is feasible because Netty manage its own memory region (offheap by default) separately. So Netty OOM doesn't mean the memory shortage of Spark.
### Why are the changes needed?
The Netty OOM issue is a very corner case. It usually happens in the large-scale cluster, where a reduce task could fetch shuffle blocks from hundreds of nodes concurrently in a short time. Internally, we found a cluster that has created 260+ clients within 6s before throwing Netty OOM.
Although Spark has configurations, e.g., `spark.reducer.maxReqsInFlight` to tune the number of concurrent requests, it's usually not a easy decision for the user to set a reasonable value regarding the workloads, machine resources, etc. But with this fix, Spark would heal the Netty memory issue itself without any specific configurations.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit tests.
Closes#32287 from Ngone51/SPARK-27991.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
1. In HadoopMapReduceCommitProtocol, create parent directory before renaming custom partition path staging files
2. In InMemoryCatalog and HiveExternalCatalog, create new partition directory before renaming old partition path
3. Check return value of FileSystem#rename, if false, throw exception to avoid silent data loss cause by rename failure
4. Change DebugFilesystem#rename behavior to make it match HDFS's behavior (return false without rename when dst parent directory not exist)
### Why are the changes needed?
Depends on FileSystem#rename implementation, when destination directory does not exist, file system may
1. return false without renaming file nor throwing exception (e.g. HDFS), or
2. create destination directory, rename files, and return true (e.g. LocalFileSystem)
In the first case above, renames in HadoopMapReduceCommitProtocol for custom partition path will fail silently if the destination partition path does not exist. Failed renames can happen when
1. dynamicPartitionOverwrite == true, the custom partition path directories are deleted by the job before the rename; or
2. the custom partition path directories do not exist before the job; or
3. something else is wrong when file system handle `rename`
The renames in MemoryCatalog and HiveExternalCatalog for partition renaming also have similar issue.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Modified DebugFilesystem#rename, and added new unit tests.
Without the fix in src code, five InsertSuite tests and one AlterTableRenamePartitionSuite test failed:
InsertSuite.SPARK-20236: dynamic partition overwrite with custom partition path (existing test with modified FS)
```
== Results ==
!== Correct Answer - 1 == == Spark Answer - 0 ==
struct<> struct<>
![2,1,1]
```
InsertSuite.SPARK-35106: insert overwrite with custom partition path
```
== Results ==
!== Correct Answer - 1 == == Spark Answer - 0 ==
struct<> struct<>
![2,1,1]
```
InsertSuite.SPARK-35106: dynamic partition overwrite with custom partition path
```
== Results ==
!== Correct Answer - 2 == == Spark Answer - 1 ==
!struct<> struct<i:int,part1:int,part2:int>
[1,1,1] [1,1,1]
![1,1,2]
```
InsertSuite.SPARK-35106: Throw exception when rename custom partition paths returns false
```
Expected exception org.apache.spark.SparkException to be thrown, but no exception was thrown
```
InsertSuite.SPARK-35106: Throw exception when rename dynamic partition paths returns false
```
Expected exception org.apache.spark.SparkException to be thrown, but no exception was thrown
```
AlterTableRenamePartitionSuite.ALTER TABLE .. RENAME PARTITION V1: multi part partition (existing test with modified FS)
```
== Results ==
!== Correct Answer - 1 == == Spark Answer - 0 ==
struct<> struct<>
![3,123,3]
```
Closes#32530 from YuzhouSun/SPARK-35106.
Authored-by: Yuzhou Sun <yuzhosun@amazon.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Introduce new shared methods to `ShuffleBlockFetcherIteratorSuite` to replace copy-pasted code. Use modern, Scala-like Mockito `Answer` syntax.
### Why are the changes needed?
`ShuffleFetcherBlockIteratorSuite` has tons of duplicate code, like 0494dc90af/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala (L172-L185) . It's challenging to tell what the interesting parts are vs. what is just being set to some default/unused value.
Similarly but not as bad, there are many calls like the following
```
verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any(), any())
when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any())).thenAnswer ...
```
These changes result in about 10% reduction in both lines and characters in the file:
```bash
# Before
> wc core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
1063 3950 43201 core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
# After
> wc core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
928 3609 39053 core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
```
It also helps readability, e.g.:
```
val iterator = createShuffleBlockIteratorWithDefaults(
transfer,
blocksByAddress,
maxBytesInFlight = 1000L
)
```
Now I can clearly tell that `maxBytesInFlight` is the main parameter we're interested in here.
### Does this PR introduce _any_ user-facing change?
No, test only. There aren't even any behavior changes, just refactoring.
### How was this patch tested?
Unit tests pass.
Closes#32389 from xkrogen/xkrogen-spark-35263-refactor-shuffleblockfetcheriteratorsuite.
Authored-by: Erik Krogen <xkrogen@apache.org>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
Add a common functions `getWorkspaceFilePath` (which prefixed with spark home) to `SparkFunctionSuite`, and applies these the function to where they're extracted from.
### Why are the changes needed?
Spark sql has test suites to read resources when running tests. The way of getting the path of resources is commonly used in different suites. We can extract them into a function to ease the code maintenance.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass existing tests.
Closes#32315 from Ngone51/extract-common-file-path.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
make these threads easier to identify in thread dumps
### Why are the changes needed?
make these threads easier to identify in thread dumps
### Does this PR introduce _any_ user-facing change?
yes. Driver thread dumps will show the timers with pretty names
### How was this patch tested?
verified locally
Closes#32549 from yaooqinn/SPARK-35404.
Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR is a followup of https://github.com/apache/spark/pull/32436 which broke JavaScript linter. There was a logical conflict - the linter was added after the last successful test run in that PR.
```
added 118 packages in 1.482s
/__w/spark/spark/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
34:41 error 'type' is defined but never used. Allowed unused args must match /^_ignored_.*/u no-unused-vars
34:47 error 'row' is defined but never used. Allowed unused args must match /^_ignored_.*/u no-unused-vars
35:1 error Expected indentation of 2 spaces but found 4 indent
36:1 error Expected indentation of 4 spaces but found 7 indent
37:1 error Expected indentation of 2 spaces but found 4 indent
38:1 error Expected indentation of 4 spaces but found 7 indent
39:1 error Expected indentation of 2 spaces but found 4 indent
556:1 error Expected indentation of 14 spaces but found 16 indent
557:1 error Expected indentation of 14 spaces but found 16 indent
```
### Why are the changes needed?
To recover the build
### Does this PR introduce _any_ user-facing change?
No, dev-only.
### How was this patch tested?
Manually tested:
```bash
./dev/lint-js
lint-js checks passed.
```
Closes#32541 from HyukjinKwon/SPARK-34764-followup.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
### What changes were proposed in this pull request?
Adds the exec loss reason to the Spark web UI & in doing so also fix the Kube integration to pass exec loss reason into core.
UI change:
![image](https://user-images.githubusercontent.com/59893/117045762-b975ba80-acc4-11eb-9679-8edab3cfadc2.png)
### Why are the changes needed?
Debugging Spark jobs is *hard*, making it clearer why executors have exited could help.
### Does this PR introduce _any_ user-facing change?
Yes a new column on the executor page.
### How was this patch tested?
K8s unit test updated to validate exec loss reasons are passed through regardless of exec alive state, manual testing to validate the UI.
Closes#32436 from holdenk/SPARK-34764-propegate-reason-for-exec-loss.
Lead-authored-by: Holden Karau <hkarau@apple.com>
Co-authored-by: Holden Karau <holden@pigscanfly.ca>
Signed-off-by: Holden Karau <hkarau@apple.com>
### What changes were proposed in this pull request?
Currently spark is not allowing to set spark.driver.memory, spark.executor.cores, spark.executor.memory to 0, but allowing driver cores to 0. This PR checks for driver core size as well. Thanks Oleg Lypkan for finding this.
### Why are the changes needed?
To make the configuration check consistent.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual testing
Closes#32504 from shahidki31/shahid/drivercore.
Lead-authored-by: shahid <shahidki31@gmail.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Co-authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR aims to improve S3A magic committer support by inferring all missing configs from a single minimum configuration, `spark.hadoop.fs.s3a.bucket.<bucket>.committer.magic.enabled=true`.
Given that AWS S3 provides a [strong read-after-write consistency](https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/) since December 2020, we can ignore DynamoDB-related configurations. As a result, the minimum set of configuration are the following:
```
spark.hadoop.fs.s3a.committer.magic.enabled=true
spark.hadoop.fs.s3a.bucket.<bucket>.committer.magic.enabled=true
spark.hadoop.fs.s3a.committer.name=magic
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a=org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
spark.sql.parquet.output.committer.class=org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass=org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
```
### Why are the changes needed?
To use S3A magic committer in Apache Spark, the users need to setup a set of configurations. And, if something is missed, it will end up with the error messages like the following.
```
Exception in thread "main" org.apache.hadoop.fs.s3a.commit.PathCommitException:
`s3a://my-spark-bucket`: Filesystem does not have support for 'magic' committer enabled in configuration option fs.s3a.committer.magic.enabled
at org.apache.hadoop.fs.s3a.commit.CommitUtils.verifyIsMagicCommitFS(CommitUtils.java:74)
at org.apache.hadoop.fs.s3a.commit.CommitUtils.getS3AFileSystem(CommitUtils.java:109)
```
### Does this PR introduce _any_ user-facing change?
Yes, after this improvement PR, all Spark users can use S3A committer by using a single configuration.
```
spark.hadoop.fs.s3a.bucket.<bucket>.committer.magic.enabled=true
```
This PR is going to inferring the missing configurations. So, there is no side-effect if the existing users who have all configurations already.
### How was this patch tested?
Pass the CIs with the newly added test cases.
Closes#32518 from dongjoon-hyun/SPARK-35383.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR proposes to introduces three new configurations to limit the maximum number of jobs/stages/executors on the timeline view.
### Why are the changes needed?
If the number of items on the timeline view grows +1000, rendering can be significantly slow.
https://issues.apache.org/jira/browse/SPARK-35229
The maximum number of tasks on the timeline is already limited by `spark.ui.timeline.tasks.maximum` so l proposed to mitigate this issue with the same manner.
### Does this PR introduce _any_ user-facing change?
Yes. the maximum number of items shown on the timeline view is limited.
I proposed the default value 500 for jobs and stages, and 250 for executors.
A executor has at most 2 items (added and removed) 250 is chosen.
### How was this patch tested?
I manually confirm this change works with the following procedures.
```
# launch a cluster
$ bin/spark-shell --conf spark.ui.retainedDeadExecutors=300 --master "local-cluster[4, 1, 1024]"
// Confirm the maximum number of jobs
(1 to 1000).foreach { _ => sc.parallelize(List(1)).collect }
// Confirm the maximum number of stages
var df = sc.parallelize(1 to 2)
(1 to 1000).foreach { i => df = df.repartition(i % 5 + 1) }
df.collect
// Confirm the maximum number of executors
(1 to 300).foreach { _ => try sc.parallelize(List(1)).foreach { _ => System.exit(0) } catch { case e => }}
```
Screenshots here.
![jobs_limited](https://user-images.githubusercontent.com/4736016/116386937-3e8c4a00-a855-11eb-8f4c-151cf7ddd3b8.png)
![stages_limited](https://user-images.githubusercontent.com/4736016/116386990-49df7580-a855-11eb-9f71-8e129e3336ab.png)
![executors_limited](https://user-images.githubusercontent.com/4736016/116387009-4f3cc000-a855-11eb-8697-a2eb4c9c99e6.png)
Closes#32381 from sarutak/mitigate-timeline-issue.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Gengliang Wang <ltnwgl@gmail.com>
### What changes were proposed in this pull request?
Now Spark Executor already can be used in Kubernetes scheduler. So we should modify the annotation in the Executor.scala.
### Why are the changes needed?
only comment
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
no
Closes#32426 from jerqi/master.
Authored-by: RoryQi <1242949407@qq.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
This PR proposes to add linter for JavaScript source files.
[ESLint](https://eslint.org/) seems to be a popular linter for JavaScript so I choose it.
### Why are the changes needed?
Linter enables us to check style and keeps code clean.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually run `dev/lint-js` (Node.js and npm are required).
In this PR, mainly indentation style is also fixed an linter passes.
Closes#32274 from sarutak/introduce-eslint.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
### What changes were proposed in this pull request?
Set `IS_TESTING` to true in `BenchmarkBase`, before running benchmarks.
### Why are the changes needed?
Currently benchmark can be done via 2 ways: `spark-submit`, or SBT command. However in the former Spark will miss some properties such as `IS_TESTING`, which is necessary to turn on/off certain behavior like codegen (`spark.sql.codegen.factoryMode`). Therefore, the result could differ between the two. In addition, the benchmark GitHub workflow is using the spark-submit approach.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
N/A
Closes#32440 from sunchao/SPARK-35315.
Authored-by: Chao Sun <sunchao@apache.org>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request?
This PR fixes a bug in [SPARK-35266](https://issues.apache.org/jira/browse/SPARK-35266) that creates benchmark files in the invalid path with the wrong name.
e.g. For `BLASBenchmark`,
- AS-IS: Creates `benchmarksBLASBenchmark-results.txt` in `{SPARK_HOME}/mllib-local/`
- TO-BE: Creates `BLASBenchmark-results.txt` in `{SPARK_HOME}/mllib-local/benchmarks/`
### Why are the changes needed?
As you can see in the above example, new benchmark files cannot be created as intended due to this bug.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
After building Spark, manually tested with the following command:
```
SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class \
org.apache.spark.benchmark.Benchmarks --jars \
"`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \
"`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \
"org.apache.spark.ml.linalg.BLASBenchmark"
```
It successfully generated the benchmark files as intended (`BLASBenchmark-results.txt` in `{SPARK_HOME}/mllib-local/benchmarks/`).
Closes#32432 from byungsoo-oh/SPARK-35308.
Lead-authored-by: byungsoo <byungsoo@byungsoo-pc.tn.corp.samsungelectronics.net>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR fixes an error in `BenchmarkBase.scala` that occurs when creating a benchmark file in a non-existent directory.
### Why are the changes needed?
When submitting a benchmark job using `org.apache.spark.benchmark.Benchmarks` class with `SPARK_GENERATE_BENCHMARK_FILES=1` option, an exception is raised if the directory where the benchmark file will be generated does not exist.
For more information, please refer to [SPARK-35266](https://issues.apache.org/jira/browse/SPARK-35266).
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
After building Spark, manually tested with the following command:
```
SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class \
org.apache.spark.benchmark.Benchmarks --jars \
"`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \
"`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \
"org.apache.spark.ml.linalg.BLASBenchmark"
```
It successfully generated the benchmark result files.
**Why it is sufficient:**
As illustrated in the comments in `Benchmarks.scala`, the command below runs all benchmarks and generates the results:
```
SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class \
org.apache.spark.benchmark.Benchmarks --jars \
"`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \
"`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \
"*"
```
Of all the benchmarks (55 benchmarks in total), only `BLASBenchmark` fails due to the proposed issue for the current code in the master branch. Thus, it is currently sufficient to test `BLASBenchmark` to validate this change.
Closes#32394 from byungsoo-oh/SPARK-35266.
Authored-by: byungsoo <byungsoo@byungsoo-pc.tn.corp.samsungelectronics.net>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
With this PR Spark avoids creating multiple monitor threads for the same worker and same task context.
### Why are the changes needed?
Without this change unnecessary threads will be created. It even can cause job failure for example when a coalesce (without shuffle) from high partition number goes to very low one. This exception is exactly comes for such a run:
```
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (192.168.1.210 executor driver): java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.CoalescedRDD.$anonfun$compute$1(CoalescedRDD.scala:99)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at scala.collection.AbstractIterator.to(Iterator.scala:1429)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2260)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1437)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2262)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2211)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2210)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2210)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1083)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1083)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1083)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2449)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2391)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2380)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:872)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2220)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2241)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2260)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2285)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.CoalescedRDD.$anonfun$compute$1(CoalescedRDD.scala:99)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at scala.collection.AbstractIterator.to(Iterator.scala:1429)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2260)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1437)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually I used a the following Python script used (`reproduce-SPARK-35009.py`):
```
import pyspark
conf = pyspark.SparkConf().setMaster("local[*]").setAppName("Test1")
sc = pyspark.SparkContext.getOrCreate(conf)
rows = 70000
data = list(range(rows))
rdd = sc.parallelize(data, rows)
assert rdd.getNumPartitions() == rows
rdd0 = rdd.filter(lambda x: False)
data = rdd0.coalesce(1).collect()
assert data == []
```
Spark submit:
```
$ ./bin/spark-submit reproduce-SPARK-35009.py
```
#### With this change
Checking the number of monitor threads with jcmd:
```
$ jcmd
85273 sun.tools.jcmd.JCmd
85227 org.apache.spark.deploy.SparkSubmit reproduce-SPARK-35009.py
41020 scala.tools.nsc.MainGenericRunner
$ jcmd 85227 Thread.print | grep -c "Monitor for python"
2
$ jcmd 85227 Thread.print | grep -c "Monitor for python"
2
...
$ jcmd 85227 Thread.print | grep -c "Monitor for python"
2
$ jcmd 85227 Thread.print | grep -c "Monitor for python"
2
$ jcmd 85227 Thread.print | grep -c "Monitor for python"
2
$ jcmd 85227 Thread.print | grep -c "Monitor for python"
2
```
<img width="859" alt="Screenshot 2021-04-14 at 16 06 51" src="https://user-images.githubusercontent.com/2017933/114731755-4969b980-9d42-11eb-8ec5-f60b217bdd96.png">
#### Without this change
```
...
$ jcmd 90052 Thread.print | grep -c "Monitor for python" [INSERT]
5645
..
```
<img width="856" alt="Screenshot 2021-04-14 at 16 30 18" src="https://user-images.githubusercontent.com/2017933/114731724-4373d880-9d42-11eb-9f9b-d976bf2530e2.png">
Closes#32169 from attilapiros/SPARK-35009.
Authored-by: attilapiros <piros.attila.zsolt@gmail.com>
Signed-off-by: attilapiros <piros.attila.zsolt@gmail.com>
### What changes were proposed in this pull request?
`failureMessage` is already formatted, but `replaceAll("\n", " ")` destroyed the format. This PR fixed it.
### Why are the changes needed?
The formatted error message is easier to read and debug.
### Does this PR introduce _any_ user-facing change?
Yes, users see the clear error message in the application log.
(Note I changed a little bit to let the test throw exception intentionally. The test itself is good.)
Before:
![2141619490903_ pic_hd](https://user-images.githubusercontent.com/16397174/116177970-5a092f00-a747-11eb-9a0f-017391e80c8b.jpg)
After:
![2151619490955_ pic_hd](https://user-images.githubusercontent.com/16397174/116177981-5ecde300-a747-11eb-90ef-fd16e906beeb.jpg)
### How was this patch tested?
Manually tested.
Closes#32356 from Ngone51/format-stage-error-message.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: attilapiros <piros.attila.zsolt@gmail.com>
### What changes were proposed in this pull request?
`WritablePartitionedIterator` define in `WritablePartitionedPairCollection.scala` and there are two implementation of these trait, but the code for these two implementations is duplicate.
The main change of this pr is turn the `WritablePartitionedIterator` from a trait into a default implementation class because there is only one implementation now.
### Why are the changes needed?
Cleanup duplicate code.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the Jenkins or GitHub Action
Closes#32232 from LuciferYang/writable-partitioned-iterator.
Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
### What changes were proposed in this pull request?
This change is to use repos.spark-packages.org instead of Bintray as the repository service for spark-packages.
### Why are the changes needed?
The change is needed because Bintray will no longer be available from May 1st.
### Does this PR introduce _any_ user-facing change?
This should be transparent for users who use SparkSubmit.
### How was this patch tested?
Tested running spark-shell with --packages manually.
Closes#32346 from bozhang2820/replace-bintray.
Authored-by: Bo Zhang <bo.zhang@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>