### What changes were proposed in this pull request?
Just few log lines fixes which are logging the object name instead of the stage IDs
### Why are the changes needed?
This would make it easier later for debugging.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Just log messages. Existing tests should be enough
Closes#29279 from venkata91/SPARK-31418-follow-up.
Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This is a follow-up of #28986.
This PR adds a config to switch allow/disallow to create `SparkContext` in executors.
- `spark.driver.allowSparkContextInExecutors`
### Why are the changes needed?
Some users or libraries actually create `SparkContext` in executors.
We shouldn't break their workloads.
### Does this PR introduce _any_ user-facing change?
Yes, users will be able to create `SparkContext` in executors with the config enabled.
### How was this patch tested?
More tests are added.
Closes#29278 from ueshin/issues/SPARK-32160/add_configs.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR removes a test added in SPARK-32175(#29002).
### Why are the changes needed?
That test is flaky. It can be mitigated by increasing the timeout but it would rather be simpler to remove the test.
See also the [discussion](https://github.com/apache/spark/pull/29002#issuecomment-666746857).
### Does this PR introduce _any_ user-facing change?
No.
Closes#29314 from sarutak/remove-flaky-test.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
### What changes were proposed in this pull request?
This test tries to fix the flakyness of BlockManagerDecommissionIntegrationSuite.
### Description of the problem
Make the block manager decommissioning test be less flaky
An interesting failure happens when migrateDuring = true (and persist or shuffle is true):
- We schedule the job with tasks on executors 0, 1, 2.
- We wait 300 ms and decommission executor 0.
- If the task is not yet done on executor 0, it will now fail because
the block manager won't be able to save the block. This condition is
easy to trigger on a loaded machine where the github checks run.
- The task with retry on a different executor (1 or 2) and its shuffle
blocks will land there.
- No actual block migration happens here because the decommissioned
executor technically failed before it could even produce a block.
To remove the above race, this change replaces the fixed wait for 300 ms to wait for an actual task to succeed. When a task has succeeded, we know its blocks would have been written for sure and thus its executor would certainly be forced to migrate those blocks when it is decommissioned.
The change always decommissions an executor on which a real task finished successfully instead of picking the first executor. Because the system may choose to schedule nothing on the first executor and instead run the two tasks on one executor.
### Why are the changes needed?
I have had bad luck with BlockManagerDecommissionIntegrationSuite and it has failed several times on my PRs. So fixing it.
### Does this PR introduce _any_ user-facing change?
No, unit test only change.
### How was this patch tested?
Github checks. Ran this test 100 times, 10 at a time in parallel in a script.
Closes#29226 from agrawaldevesh/block-manager-decom-flaky.
Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
### What changes were proposed in this pull request?
This PR reduces the prospect of a job loss during decommissioning. It
fixes two holes in the current decommissioning framework:
- (a) Loss of decommissioned executors is not treated as a job failure:
We know that the decommissioned executor would be dying soon, so its death is
clearly not caused by the application.
- (b) Shuffle files on the decommissioned host are cleared when the
first fetch failure is detected from a decommissioned host: This is a
bit tricky in terms of when to clear the shuffle state ? Ideally you
want to clear it the millisecond before the shuffle service on the node
dies (or the executor dies when there is no external shuffle service) --
too soon and it could lead to some wastage and too late would lead to
fetch failures.
The approach here is to do this clearing when the very first fetch
failure is observed on the decommissioned block manager, without waiting for
other blocks to also signal a failure.
### Why are the changes needed?
Without them decommissioning a lot of executors at a time leads to job failures.
### Code overview
The task scheduler tracks the executors that were decommissioned along with their
`ExecutorDecommissionInfo`. This information is used by: (a) For handling a `ExecutorProcessLost` error, or (b) by the `DAGScheduler` when handling a fetch failure.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added a new unit test `DecommissionWorkerSuite` to test the new behavior by exercising the Master-Worker decommissioning. I chose to add a new test since the setup logic was quite different from the existing `WorkerDecommissionSuite`. I am open to changing the name of the newly added test suite :-)
### Questions for reviewers
- Should I add a feature flag to guard these two behaviors ? They seem safe to me that they should only get triggered by decommissioning, but you never know :-).
Closes#29014 from agrawaldevesh/decom_harden.
Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
### What changes were proposed in this pull request?
So far, we fixed many stuffs in `core` module. This PR fixes the remaining UT failures in Scala 2.13.
- `OneApplicationResource.environmentInfo` will return a deterministic result for `sparkProperties`, `hadoopProperties`, `systemProperties`, and `classpathEntries`.
- `SubmitRestProtocolSuite` has Scala 2.13 answer in addition to the existing Scala 2.12 answer, and uses the expected answer based on the Scala runtime version.
### Why are the changes needed?
To support Scala 2.13.
### Does this PR introduce _any_ user-facing change?
Yes, `environmentInfo` is changed, but this fixes the indeterministic behavior.
### How was this patch tested?
- Scala 2.12: Pass the Jenkins or GitHub Action
- Scala 2.13: Do the following.
```
$ dev/change-scala-version.sh 2.13
$ build/mvn test -pl core --am -Pscala-2.13
```
**BEFORE**
```
Tests: succeeded 2612, failed 3, canceled 1, ignored 8, pending 0
*** 3 TESTS FAILED ***
```
**AFTER**
```
Tests: succeeded 2615, failed 0, canceled 1, ignored 8, pending 0
All tests passed.
```
Closes#29298 from dongjoon-hyun/SPARK-32489.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR aims to remove `java.ws.rs.NotFoundException` from two problematic `import` statements. All the other use cases are correct.
### Why are the changes needed?
In `StagesResource` and `OneApplicationResource`, there exist two `NotFoundException`s.
- javax.ws.rs.NotFoundException
- org.apache.spark.status.api.v1.NotFoundException
To use `org.apache.spark.status.api.v1.NotFoundException` correctly, we should not import `java.ws.rs.NotFoundException`. This causes UT failures in Scala 2.13 environment.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- Scala 2.12: Pass the GitHub Action or Jenkins.
- Scala 2.13: Do the following manually.
```
$ dev/change-scala-version.sh 2.13
$ build/mvn test -pl core --am -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.deploy.history.HistoryServerSuite
```
**BEFORE**
```
*** 4 TESTS FAILED ***
```
**AFTER**
```
*** 1 TEST FAILED ***
```
Closes#29293 from dongjoon-hyun/SPARK-32487.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR aims to make `ResourceAllocator.availableAddrs` deterministic.
### Why are the changes needed?
Currently, this function returns indeterministically due to the underlying `HashMap`. So, the test case itself is creating a list `[0, 1, 2]` initially, but ends up with comparing `[2, 1, 0]`.
Not only this happens in the 3.0.0, but also this causes UT failures on Scala 2.13 environment.
### Does this PR introduce _any_ user-facing change?
Yes, but this fixes the in-deterministic behavior.
### How was this patch tested?
- Scala 2.12: This should pass the UT with the modified test case.
- Scala 2.13: This can be tested like the following (at least `JsonProtocolSuite`)
```
$ dev/change-scala-version.sh 2.13
$ build/mvn test -pl core --am -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.deploy.JsonProtocolSuite
```
**BEFORE**
```
*** 2 TESTS FAILED ***
```
**AFTER**
```
All tests passed.
```
Closes#29281 from dongjoon-hyun/SPARK-32476.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR aims to make `JsonProtocol.accumulablesToJson` deterministic.
### Why are the changes needed?
Currently, `JsonProtocol.accumulablesToJson` is indeterministic. So, `JsonProtocolSuite` itself is also using mixed test cases in terms of `"Accumulables": [ ... ]`.
Not only this is indeterministic, but also this causes a UT failure in `JsonProtocolSuite` in Scala 2.13.
### Does this PR introduce _any_ user-facing change?
Yes. However, this is a fix on indeterministic behavior.
### How was this patch tested?
- Scala 2.12: Pass the GitHub Action or Jenkins.
- Scala 2.13: Do the following.
```
$ dev/change-scala-version.sh 2.13
$ build/mvn test -pl core --am -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.util.JsonProtocolSuite
```
**BEFORE**
```
*** 1 TEST FAILED ***
```
**AFTER**
```
All tests passed.
```
Closes#29282 from dongjoon-hyun/SPARK-32477.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR changes the order between initialization for ExecutorPlugin and starting heartbeat thread in Executor.
### Why are the changes needed?
In the current master, heartbeat thread in a executor starts after plugin initialization so if the initialization takes long time, heartbeat is not sent to driver and the executor will be removed from cluster.
### Does this PR introduce _any_ user-facing change?
Yes. Plugins for executors will be allowed to take long time for initialization.
### How was this patch tested?
New testcase.
Closes#29002 from sarutak/fix-heartbeat-issue.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
### What changes were proposed in this pull request?
`spark.kryo.registrator` in 3.0 has a regression problem. From [SPARK-12080](https://issues.apache.org/jira/browse/SPARK-12080), it supports multiple user registrators by
```scala
private val userRegistrators = conf.get("spark.kryo.registrator", "")
.split(',').map(_.trim)
.filter(!_.isEmpty)
```
But it donsn't work in 3.0. Fix it by `toSequence` in `Kryo.scala`
### Why are the changes needed?
In previous Spark version (2.x), it supported multiple user registrators by
```scala
private val userRegistrators = conf.get("spark.kryo.registrator", "")
.split(',').map(_.trim)
.filter(!_.isEmpty)
```
But it doesn't work in 3.0. It's should be a regression.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existed unit tests.
Closes#29123 from LantaoJin/SPARK-32283.
Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR aims to use `command -v` in non-Window operating systems instead of executing the given command.
### Why are the changes needed?
1. `command` is POSIX-compatible
- **POSIX.1-2017**: https://pubs.opengroup.org/onlinepubs/9699919799/utilities/command.html
2. `command` is faster and safer than the direct execution
- `command` doesn't invoke another process.
```scala
scala> sys.process.Process("ls").run().exitValue()
LICENSE
NOTICE
bin
doc
lib
man
res1: Int = 0
```
3. The existing way behaves inconsistently.
- `rm` cannot be checked.
**AS-IS**
```scala
scala> sys.process.Process("rm").run().exitValue()
usage: rm [-f | -i] [-dPRrvW] file ...
unlink file
res0: Int = 64
```
**TO-BE**
```
Welcome to Scala 2.13.3 (OpenJDK 64-Bit Server VM, Java 1.8.0_262).
Type in expressions for evaluation. Or try :help.
scala> sys.process.Process(Seq("sh", "-c", s"command -v ls")).run().exitValue()
/bin/ls
val res1: Int = 0
```
4. The existing logic is already broken in Scala 2.13 environment because it hangs like the following.
```scala
$ bin/scala
Welcome to Scala 2.13.3 (OpenJDK 64-Bit Server VM, Java 1.8.0_262).
Type in expressions for evaluation. Or try :help.
scala> sys.process.Process("cat").run().exitValue() // hang here.
```
### Does this PR introduce _any_ user-facing change?
No. Although this is inside `main` source directory, this is used for testing purpose.
```
$ git grep testCommandAvailable | grep -v 'def testCommandAvailable'
core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala: assume(TestUtils.testCommandAvailable("cat"))
core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala: assume(TestUtils.testCommandAvailable("wc"))
core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala: assume(TestUtils.testCommandAvailable("cat"))
core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala: assume(TestUtils.testCommandAvailable("cat"))
core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala: assume(TestUtils.testCommandAvailable("cat"))
core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala: assume(TestUtils.testCommandAvailable(envCommand))
core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala: assume(!TestUtils.testCommandAvailable("some_nonexistent_command"))
core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala: assume(TestUtils.testCommandAvailable("cat"))
core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala: assume(TestUtils.testCommandAvailable("cat"))
core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala: assume(TestUtils.testCommandAvailable(envCommand))
sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala: private lazy val isPythonAvailable: Boolean = TestUtils.testCommandAvailable(pythonExec)
sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala: if (TestUtils.testCommandAvailable(pythonExec)) {
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala: skip = !TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala: skip = !TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala: skip = !TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala: skip = !TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala: skip = !TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala: skip = !TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala: assume(TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala: skip = !TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala: skip = !TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala: assume(TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala: assume(TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala: assume(TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala: assume(TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala: assume(TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala: assume(TestUtils.testCommandAvailable("python"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala: assume(TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala: assume(TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala: assume(TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala: assume(TestUtils.testCommandAvailable("echo | sed"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala: assume(TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala: assume(TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala: assume(TestUtils.testCommandAvailable("/bin/bash"))
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala: assume(TestUtils.testCommandAvailable("/bin/bash"))
```
### How was this patch tested?
- **Scala 2.12**: Pass the Jenkins with the existing tests and one modified test.
- **Scala 2.13**: Do the following manually. It should pass instead of `hang`.
```
$ dev/change-scala-version.sh 2.13
$ build/mvn test -pl core --am -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.rdd.PipedRDDSuite
...
Tests: succeeded 12, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```
Closes#29241 from dongjoon-hyun/SPARK-32443.
Lead-authored-by: HyukjinKwon <gurwls223@apache.org>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Support set off heap memory in `ExecutorResourceRequests`
### Why are the changes needed?
Support stage level scheduling
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added UT in `ResourceProfileSuite` and `DAGSchedulerSuite`
Closes#28972 from warrenzhu25/30794.
Authored-by: Warren Zhu <zhonzh@microsoft.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
### What changes were proposed in this pull request?
This PR aims to increase the memory parameter in `BlockManagerSuite`'s worker decommission test cases.
### Why are the changes needed?
Scala 2.13 generates different Java objects and this affects Spark's `SizeEstimator/SizeTracker/SizeTrackingVector`. This causes UT failures like the following. If we decrease the values, those test cases fails in Scala 2.12, too.
```
$ dev/change-scala-version.sh 2.13
$ build/mvn test -pl core --am -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.storage.BlockManagerSuite
...
- test decommission block manager should not be part of peers *** FAILED ***
0 did not equal 2 (BlockManagerSuite.scala:1869)
- test decommissionRddCacheBlocks should offload all cached blocks *** FAILED ***
0 did not equal 2 (BlockManagerSuite.scala:1884)
...
Tests: succeeded 81, failed 2, canceled 0, ignored 0, pending 0
*** 2 TESTS FAILED ***
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
```
$ dev/change-scala-version.sh 2.13
$ build/mvn test -pl core --am -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.storage.BlockManagerSuite
...
Tests: succeeded 83, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```
Closes#29238 from dongjoon-hyun/SPARK-32440.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Since Scala 2.13, `HashMap` is changed to become a final in the future and `.withDefault` is recommended. This PR aims to use `HashMap.withDefaultValue` instead of overriding manually in the test case.
- https://www.scala-lang.org/api/current/scala/collection/mutable/HashMap.html
```scala
deprecatedInheritance(message =
"HashMap wil be made final; use .withDefault for the common use case of computing a default value",
since = "2.13.0")
```
### Why are the changes needed?
In Scala 2.13, the existing code causes a failure because the default value function doesn't work correctly.
```
$ dev/change-scala-version.sh 2.13
$ build/mvn test -pl core --am -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.rdd.RDDSuite
- aggregate *** FAILED ***
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 61.0 failed 1 times, most recent failure: Lost task 0.0 in stage 61.0 (TID 198, localhost, executor driver):
java.util.NoSuchElementException: key not found: a
```
### Does this PR introduce _any_ user-facing change?
No. This is a test case change.
### How was this patch tested?
1. **Scala 2.12:** Pass the Jenkins or GitHub with the existing tests.
2. **Scala 2.13**: Manually do the following.
```
$ dev/change-scala-version.sh 2.13
$ build/mvn test -pl core --am -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.rdd.RDDSuite
...
Tests: succeeded 72, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```
Closes#29235 from dongjoon-hyun/SPARK-32438.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR aims to initialize `numNonEmptyBlocks` in `HighlyCompressedMapStatus.readExternal`.
In Scala 2.12, this is initialized to `-1` via the following.
```scala
protected def this() = this(null, -1, null, -1, null, -1) // For deserialization only
```
### Why are the changes needed?
In Scala 2.13, this causes several UT failures because `HighlyCompressedMapStatus.readExternal` doesn't initialize this field. The following is one example.
- org.apache.spark.scheduler.MapStatusSuite
```
MapStatusSuite:
- compressSize
- decompressSize
*** RUN ABORTED ***
java.lang.NoSuchFieldError: numNonEmptyBlocks
at org.apache.spark.scheduler.HighlyCompressedMapStatus.<init>(MapStatus.scala:181)
at org.apache.spark.scheduler.HighlyCompressedMapStatus$.apply(MapStatus.scala:281)
at org.apache.spark.scheduler.MapStatus$.apply(MapStatus.scala:73)
at org.apache.spark.scheduler.MapStatusSuite.$anonfun$new$8(MapStatusSuite.scala:64)
at scala.runtime.java8.JFunction1$mcVD$sp.apply(JFunction1$mcVD$sp.scala:18)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.apache.spark.scheduler.MapStatusSuite.$anonfun$new$7(MapStatusSuite.scala:61)
at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.scala:18)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.apache.spark.scheduler.MapStatusSuite.$anonfun$new$6(MapStatusSuite.scala:60)
...
```
### Does this PR introduce _any_ user-facing change?
No. This is a private class.
### How was this patch tested?
1. Pass the GitHub Action or Jenkins with the existing tests.
2. Test with Scala-2.13 with `MapStatusSuite`.
```
$ dev/change-scala-version.sh 2.13
$ build/mvn test -pl core --am -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.scheduler.MapStatusSuite
...
MapStatusSuite:
- compressSize
- decompressSize
- MapStatus should never report non-empty blocks' sizes as 0
- large tasks should use org.apache.spark.scheduler.HighlyCompressedMapStatus
- HighlyCompressedMapStatus: estimated size should be the average non-empty block size
- SPARK-22540: ensure HighlyCompressedMapStatus calculates correct avgSize
- RoaringBitmap: runOptimize succeeded
- RoaringBitmap: runOptimize failed
- Blocks which are bigger than SHUFFLE_ACCURATE_BLOCK_THRESHOLD should not be underestimated.
- SPARK-21133 HighlyCompressedMapStatus#writeExternal throws NPE
Run completed in 7 seconds, 971 milliseconds.
Total number of tests run: 10
Suites: completed 2, aborted 0
Tests: succeeded 10, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```
Closes#29231 from dongjoon-hyun/SPARK-32436.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR aims to speed up `MapStatus` deserialization by 5~18% with the latest RoaringBitmap `0.9.0` and new APIs. Note that we focus on `deserialization` time because `serialization` occurs once while `deserialization` occurs many times.
### Why are the changes needed?
The current version is too old. We had better upgrade it to get the performance improvement and bug fixes.
Although `MapStatusesSerDeserBenchmark` is synthetic, the benchmark result is updated with this patch.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the Jenkins or GitHub Action.
Closes#29233 from dongjoon-hyun/SPARK-ROAR.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
`UninterruptibleThread` running functionality is baked into `KafkaOffsetReader` which can be extracted into a class. The main intention is to simplify `KafkaOffsetReader` in order to make easier to solve SPARK-32032. In this PR I've made this extraction without functionality change.
### Why are the changes needed?
`UninterruptibleThread` running functionality is baked into `KafkaOffsetReader`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing + additional unit tests.
Closes#29187 from gaborgsomogyi/SPARK-32387.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
I wasn't able to reproduce the failure but the best I can tell is that the allocation manager timer triggers and call doRequest. The timeout is 10s so try to increase that to 30seconds.
### Why are the changes needed?
test failure
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
unit test
Closes#29225 from tgravescs/SPARK-32287.
Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Updates to scalatest 3.2.0. Though it looks large, it is 99% changes to the new location of scalatest classes.
### Why are the changes needed?
3.2.0+ has a fix that is required for Scala 2.13.3+ compatibility.
### Does this PR introduce _any_ user-facing change?
No, only affects tests.
### How was this patch tested?
Existing tests.
Closes#29196 from srowen/SPARK-32398.
Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
In this change, when dynamic allocation is enabled instead of aborting immediately when there is an unschedulable taskset due to blacklisting, pass an event saying `SparkListenerUnschedulableTaskSetAdded` which will be handled by `ExecutorAllocationManager` and request more executors needed to schedule the unschedulable blacklisted tasks. Once the event is sent, we start the abortTimer similar to [SPARK-22148][SPARK-15815] to abort in the case when no new executors launched either due to max executors reached or cluster manager is out of capacity.
### Why are the changes needed?
This is an improvement. In the case when dynamic allocation is enabled, this would request more executors to schedule the unschedulable tasks instead of aborting the stage without even retrying upto spark.task.maxFailures times (in some cases not retrying at all). This is a potential issue with respect to Spark's Fault tolerance.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Added unit tests both in ExecutorAllocationManagerSuite and TaskSchedulerImplSuite
Closes#28287 from venkata91/SPARK-31418.
Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
### What changes were proposed in this pull request?
This PR is a giant plumbing PR that plumbs an `ExecutorDecommissionInfo` along
with the DecommissionExecutor message.
### Why are the changes needed?
The primary motivation is to know whether a decommissioned executor
would also be loosing shuffle files -- and thus it is important to know
whether the host would also be decommissioned.
In the absence of this PR, the existing code assumes that decommissioning an executor does not loose the whole host with it, and thus does not clear the shuffle state if external shuffle service is enabled. While this may hold in some cases (like K8s decommissioning an executor pod, or YARN container preemption), it does not hold in others like when the cluster is managed by a Standalone Scheduler (Master). This is similar to the existing `workerLost` field in the `ExecutorProcessLost` message.
In the future, this `ExecutorDecommissionInfo` can be embellished for
knowing how long the executor has to live for scenarios like Cloud spot
kills (or Yarn preemption) and the like.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tweaked an existing unit test in `AppClientSuite`
Closes#29032 from agrawaldevesh/plumb_decom_info.
Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
### What changes were proposed in this pull request?
If an executor is lost, the `DAGScheduler` handles the executor loss by removing the executor but does not unregister its outputs if the external shuffle service is used. However, if the node on which the executor runs is lost, the shuffle service may not be able to serve the shuffle files.
In such a case, when fetches from the executor's outputs fail in the same stage, the `DAGScheduler` again removes the executor and by right, should unregister its outputs. It doesn't because the epoch used to track the executor failure has not increased.
We track the epoch for failed executors that result in lost file output separately, so we can unregister the outputs in this scenario. The idea to track a second epoch is due to Attila Zsolt Piros.
### Why are the changes needed?
Without the changes, the loss of a node could require two stage attempts to recover instead of one.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit test. This test fails without the change and passes with it.
Closes#28848 from wypoon/SPARK-32003.
Authored-by: Wing Yew Poon <wypoon@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
### What changes were proposed in this pull request?
In PR, I propose to create input tables once before executing tests in `JDBCSuite` and `JdbcRDDSuite`. Currently, the table are created before every test in the test suites.
### Why are the changes needed?
This speed up the test suites up 30-40%.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Run the modified test suites
Closes#29176 from MaxGekk/jdbc-suite-before-all.
Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Catch the `RpcEnvStoppedException` and log debug it when stop is called for a `LocalSparkCluster`.
This PR also contains two small changes to fix the potential issues.
### Why are the changes needed?
Currently, there's always "RpcEnv already stopped" error if we exit spark-shell with local-cluster mode:
```
20/06/07 14:54:18 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
org.apache.spark.rpc.RpcEnvStoppedException: RpcEnv already stopped.
at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:167)
at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:150)
at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:691)
at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:253)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
```
When we call stop on `StandaloneSchedulerBackend`, the backend will firstly send `UnregisterApplication` to `Master` and then call stop on `LocalSparkCluster` immediately. On the other side, `Master` will send messages to `Worker` when it receives `UnregisterApplication`. However, the rpcEnv of the `Worker` has been already stoped by the backend. Therefore, the error message shows when the `Worker` tries to handle the messages.
It's only an error on shutdown, users would not like to care about it. So we could hide it in debug log and this is also what we've done previously in #18547.
### Does this PR introduce _any_ user-facing change?
Yes, users will not see the error message after this PR.
### How was this patch tested?
Tested manually.
Closes#28746 from Ngone51/fix-spark-31922.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
The idea is to improve the performance of HybridStore by adding batch write support to LevelDB. #28412 introduces HybridStore. HybridStore will write data to InMemoryStore at first and use a background thread to dump data to LevelDB once the writing to InMemoryStore is completed. In the comments section of #28412 , mridulm mentioned using batch writing can improve the performance of this dumping process and he wrote the code of writeAll().
### Why are the changes needed?
I did the comparison of the HybridStore switching time between one-by-one write and batch write on an HDD disk. When the disk is free, the batch-write has around 25% improvement, and when the disk is 100% busy, the batch-write has 7x - 10x improvement.
when the disk is at 0% utilization:
| log size, jobs and tasks per job | original switching time, with write() | switching time with writeAll() |
| ---------------------------------- | ------------------------------------- | ------------------------------ |
| 133m, 400 jobs, 100 tasks per job | 16s | 13s |
| 265m, 400 jobs, 200 tasks per job | 30s | 23s |
| 1.3g, 1000 jobs, 400 tasks per job | 136s | 108s |
when the disk is at 100% utilization:
| log size, jobs and tasks per job | original switching time, with write() | switching time with writeAll() |
| --------------------------------- | ------------------------------------- | ------------------------------ |
| 133m, 400 jobs, 100 tasks per job | 116s | 17s |
| 265m, 400 jobs, 200 tasks per job | 251s | 26s |
I also ran some write related benchmarking tests on LevelDBBenchmark.java and measured the total time of writing 1024 objects. The tests were conducted when the disk is at 0% utilization.
| Benchmark test | with write(), ms | with writeAll(), ms |
| ------------------------ | ---------------- | ------------------- |
| randomUpdatesIndexed | 213.06 | 157.356 |
| randomUpdatesNoIndex | 57.869 | 35.439 |
| randomWritesIndexed | 298.854 | 229.274 |
| randomWritesNoIndex | 66.764 | 38.361 |
| sequentialUpdatesIndexed | 87.019 | 56.219 |
| sequentialUpdatesNoIndex | 61.851 | 41.942 |
| sequentialWritesIndexed | 94.044 | 56.534 |
| sequentialWritesNoIndex | 118.345 | 66.483 |
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manually tested.
Closes#29149 from baohe-zhang/SPARK-32350.
Authored-by: Baohe Zhang <baohe.zhang@verizonmedia.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
### What is changed?
This pull request adds the ability to migrate shuffle files during Spark's decommissioning. The design document associated with this change is at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE .
To allow this change the `MapOutputTracker` has been extended to allow the location of shuffle files to be updated with `updateMapOutput`. When a shuffle block is put, a block update message will be sent which triggers the `updateMapOutput`.
Instead of rejecting remote puts of shuffle blocks `BlockManager` delegates the storage of shuffle blocks to it's shufflemanager's resolver (if supported). A new, experimental, trait is added for shuffle resolvers to indicate they handle remote putting of blocks.
The existing block migration code is moved out into a separate file, and a producer/consumer model is introduced for migrating shuffle files from the host as quickly as possible while not overwhelming other executors.
### Why are the changes needed?
Recomputting shuffle blocks can be expensive, we should take advantage of our decommissioning time to migrate these blocks.
### Does this PR introduce any user-facing change?
This PR introduces two new configs parameters, `spark.storage.decommission.shuffleBlocks.enabled` & `spark.storage.decommission.rddBlocks.enabled` that control which blocks should be migrated during storage decommissioning.
### How was this patch tested?
New unit test & expansion of the Spark on K8s decom test to assert that decommisioning with shuffle block migration means that the results are not recomputed even when the original executor is terminated.
This PR is a cleaned-up version of the previous WIP PR I made https://github.com/apache/spark/pull/28331 (thanks to attilapiros for his very helpful reviewing on it :)).
Closes#28708 from holdenk/SPARK-20629-copy-shuffle-data-when-nodes-are-being-shutdown-cleaned-up.
Lead-authored-by: Holden Karau <hkarau@apple.com>
Co-authored-by: Holden Karau <holden@pigscanfly.ca>
Co-authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Co-authored-by: Attila Zsolt Piros <attilazsoltpiros@apiros-mbp16.lan>
Signed-off-by: Holden Karau <hkarau@apple.com>
### What changes were proposed in this pull request?
This PR adds functionality to consider the running tasks on decommission executors based on some config.
In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds.
So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation.
### Why are the changes needed?
Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors.
### Does this PR introduce _any_ user-facing change?
Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running.
### How was this patch tested?
Added UT.
Closes#28619 from prakharjain09/SPARK-21040-speculate-decommission-exec-tasks.
Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
### What changes were proposed in this pull request?
This PR allows an external agent to inform the Master that certain hosts
are being decommissioned.
### Why are the changes needed?
The current decommissioning is triggered by the Worker getting getting a SIGPWR
(out of band possibly by some cleanup hook), which then informs the Master
about it. This approach may not be feasible in some environments that cannot
trigger a clean up hook on the Worker. In addition, when a large number of
worker nodes are being decommissioned then the master will get a flood of
messages.
So we add a new post endpoint `/workers/kill` on the MasterWebUI that allows an
external agent to inform the master about all the nodes being decommissioned in
bulk. The list of nodes is specified by providing a list of hostnames. All workers on those
hosts will be decommissioned.
This API is merely a new entry point into the existing decommissioning
logic. It does not change how the decommissioning request is handled in
its core.
### Does this PR introduce _any_ user-facing change?
Yes, a new endpoint `/workers/kill` is added to the MasterWebUI. By default only
requests originating from an IP address local to the MasterWebUI are allowed.
### How was this patch tested?
Added unit tests
Closes#29015 from agrawaldevesh/master_decom_endpoint.
Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Support fetching taskList by status as below:
```
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList?status=failed
```
### Why are the changes needed?
When there're large number of tasks in one stage, current api is hard to get taskList by status
### Does this PR introduce _any_ user-facing change?
Yes. Updated monitoring doc.
### How was this patch tested?
Added tests in `HistoryServerSuite`
Closes#28942 from warrenzhu25/SPARK-32125.
Authored-by: Warren Zhu <zhonzh@microsoft.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
### What changes were proposed in this pull request?
This PR will remove references to these "blacklist" and "whitelist" terms besides the blacklisting feature as a whole, which can be handled in a separate JIRA/PR.
This touches quite a few files, but the changes are straightforward (variable/method/etc. name changes) and most quite self-contained.
### Why are the changes needed?
As per discussion on the Spark dev list, it will be beneficial to remove references to problematic language that can alienate potential community members. One such reference is "blacklist" and "whitelist". While it seems to me that there is some valid debate as to whether these terms have racist origins, the cultural connotations are inescapable in today's world.
### Does this PR introduce _any_ user-facing change?
In the test file `HiveQueryFileTest`, a developer has the ability to specify the system property `spark.hive.whitelist` to specify a list of Hive query files that should be tested. This system property has been renamed to `spark.hive.includelist`. The old property has been kept for compatibility, but will log a warning if used. I am open to feedback from others on whether keeping a deprecated property here is unnecessary given that this is just for developers running tests.
### How was this patch tested?
Existing tests should be suitable since no behavior changes are expected as a result of this PR.
Closes#28874 from xkrogen/xkrogen-SPARK-32036-rename-blacklists.
Authored-by: Erik Krogen <ekrogen@linkedin.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
### What changes were proposed in this pull request?
Add a new class HybridStore to make the history server faster when loading event files. When rebuilding the application state from event logs, HybridStore will write data to InMemoryStore at first and use a background thread to dump data to LevelDB once the writing to InMemoryStore is completed. HybridStore is to make content serving faster by using more memory. It's only safe to enable it when the cluster is not having a heavy load.
### Why are the changes needed?
HybridStore can greatly reduce the event logs loading time, especially for large log files. In general, it has 4x - 6x UI loading speed improvement for large log files. The detailed result is shown in comments.
### Does this PR introduce any user-facing change?
This PR adds new configs `spark.history.store.hybridStore.enabled` and `spark.history.store.hybridStore.maxMemoryUsage`.
### How was this patch tested?
A test suite for HybridStore is added. I also manually tested it on 3.1.0 on mac os.
This is a follow-up for the work done by Hieu Huynh in 2019.
Closes#28412 from baohe-zhang/SPARK-31608.
Authored-by: Baohe Zhang <baohe.zhang@verizonmedia.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
This PR aims to drop Python 2.7, 3.4 and 3.5.
Roughly speaking, it removes all the widely known Python 2 compatibility workarounds such as `sys.version` comparison, `__future__`. Also, it removes the Python 2 dedicated codes such as `ArrayConstructor` in Spark.
### Why are the changes needed?
1. Unsupport EOL Python versions
2. Reduce maintenance overhead and remove a bit of legacy codes and hacks for Python 2.
3. PyPy2 has a critical bug that causes a flaky test, SPARK-28358 given my testing and investigation.
4. Users can use Python type hints with Pandas UDFs without thinking about Python version
5. Users can leverage one latest cloudpickle, https://github.com/apache/spark/pull/28950. With Python 3.8+ it can also leverage C pickle.
### Does this PR introduce _any_ user-facing change?
Yes, users cannot use Python 2.7, 3.4 and 3.5 in the upcoming Spark version.
### How was this patch tested?
Manually tested and also tested in Jenkins.
Closes#28957 from HyukjinKwon/SPARK-32138.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This change replaces the world slave with alternatives matching the context.
### Why are the changes needed?
There is no need to call things slave, we might as well use better clearer names.
### Does this PR introduce _any_ user-facing change?
Yes, the ouput JSON does change. To allow backwards compatibility this is an additive change.
The shell scripts for starting & stopping workers are renamed, and for backwards compatibility old scripts are added to call through to the new ones while printing a deprecation message to stderr.
### How was this patch tested?
Existing tests.
Closes#28864 from holdenk/SPARK-32004-drop-references-to-slave.
Lead-authored-by: Holden Karau <hkarau@apple.com>
Co-authored-by: Holden Karau <holden@pigscanfly.ca>
Signed-off-by: Holden Karau <hkarau@apple.com>
### What changes were proposed in this pull request?
When last partition's splitFile's split size is larger then maxSize, this partition will be lost
Origin logic error like below as 1, 2, 3, 4, 5
```scala
// 5. since index = partition.size now, jump out of the loop , then the last partition is lost since we won't call updatePartition() again.
while (index < partitions.size) {
// 1. we assume that when index = partitions.length -1(the last partition)
val partition = partitions(index)
val fileSplit =
partition.asInstanceOf[HadoopPartition].inputSplit.value.asInstanceOf[FileSplit]
val splitSize = fileSplit.getLength
// 2. if this partition's splitSize > maxSize
if (currentSum + splitSize < maxSize) {
addPartition(partition, splitSize)
index += 1
if (index == partitions.size) {
updateGroups
}
} else {
// 3. if currentGroup.partitions.size >0, this situation is possiable
if (currentGroup.partitions.size == 0) {
addPartition(partition, splitSize)
index += 1
} else {
// 4. then will just call updateGroups() here first, and index won't update in group
updateGroups
}
}
}
groups.toArray
}
}
```
### Why are the changes needed?
Fix bug
### Does this PR introduce any user-facing change?
NO
### How was this patch tested?
Manual code review.
Closes#27988 from AngersZhuuuu/SPARK-31226.
Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
The purpose of this PR is to partly resolve SPARK-29292, and fully resolve SPARK-30010, which should allow Spark to compile vs Scala 2.13 in Spark Core and up through GraphX (not SQL, Streaming, etc).
Note that we are not trying to determine here whether this makes Spark work on 2.13 yet, just compile, as a prerequisite for assessing test outcomes. However, of course, we need to ensure that the change does not break 2.12.
The changes are, in the main, adding .toSeq and .toMap calls where mutable collections / maps are returned as Seq / Map, which are immutable by default in Scala 2.13. The theory is that it should be a no-op for Scala 2.12 (these return themselves), and required for 2.13.
There are a few non-trivial changes highlighted below.
In particular, to get Core to compile, we need to resolve SPARK-30010 which removes a deprecated SparkConf method
### Why are the changes needed?
Eventually, we need to support a Scala 2.13 build, perhaps in Spark 3.1.
### Does this PR introduce _any_ user-facing change?
Yes, removal of the deprecated SparkConf.setAll overload, which isn't legal in Scala 2.13 anymore.
### How was this patch tested?
Existing tests. (2.13 was not _tested_; this is about getting it to compile without breaking 2.12)
Closes#28971 from srowen/SPARK-29292.1.
Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR proposes to change the HistoryServer to redirect to the history page when we access to /history without application id.
### Why are the changes needed?
In the current master, status code 400 will be returned when we access to /history.
So I wonder it's better to redirect to the history page for the better UX.
### Does this PR introduce _any_ user-facing change?
Yes. In the current master, if we access to /history without application id, we will see like the following page.
![history-400](https://user-images.githubusercontent.com/4736016/86649650-e9105380-c01c-11ea-93bb-78fd8d2e6f7b.png)
After this change applied, we will be redirected to the history page.
### How was this patch tested?
New test added.
Closes#29016 from sarutak/history-redirect.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR aims to run the Spark tests in Github Actions.
To briefly explain the main idea:
- Reuse `dev/run-tests.py` with SBT build
- Reuse the modules in `dev/sparktestsupport/modules.py` to test each module
- Pass the modules to test into `dev/run-tests.py` directly via `TEST_ONLY_MODULES` environment variable. For example, `pyspark-sql,core,sql,hive`.
- `dev/run-tests.py` _does not_ take the dependent modules into account but solely the specified modules to test.
Another thing to note might be `SlowHiveTest` annotation. Running the tests in Hive modules takes too much so the slow tests are extracted and it runs as a separate job. It was extracted from the actual elapsed time in Jenkins:
![Screen Shot 2020-07-09 at 7 48 13 PM](https://user-images.githubusercontent.com/6477701/87050238-f6098e80-c238-11ea-9c4a-ab505af61381.png)
So, Hive tests are separated into to jobs. One is slow test cases, and the other one is the other test cases.
_Note that_ the current GitHub Actions build virtually copies what the default PR builder on Jenkins does (without other profiles such as JDK 11, Hadoop 2, etc.). The only exception is Kinesis https://github.com/apache/spark/pull/29057/files#diff-04eb107ee163a50b61281ca08f4e4c7bR23
### Why are the changes needed?
Last week and onwards, the Jenkins machines became very unstable for many reasons:
- Apparently, the machines became extremely slow. Almost all tests can't pass.
- One machine (worker 4) started to have the corrupt `.m2` which fails the build.
- Documentation build fails time to time for an unknown reason in Jenkins machine specifically. This is disabled for now at https://github.com/apache/spark/pull/29017.
- Almost all PRs are basically blocked by this instability currently.
The advantages of using Github Actions:
- To avoid depending on few persons who can access to the cluster.
- To reduce the elapsed time in the build - we could split the tests (e.g., SQL, ML, CORE), and run them in parallel so the total build time will significantly reduce.
- To control the environment more flexibly.
- Other contributors can test and propose to fix Github Actions configurations so we can distribute this build management cost.
Note that:
- The current build in Jenkins takes _more than 7 hours_. With Github actions it takes _less than 2 hours_
- We can now control the environments especially for Python easily.
- The test and build look more stable than the Jenkins'.
### Does this PR introduce _any_ user-facing change?
No, dev-only change.
### How was this patch tested?
Tested at https://github.com/HyukjinKwon/spark/pull/4Closes#29057 from HyukjinKwon/migrate-to-github-actions.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
In IPv6 scenario, the current logic to split hostname and port is not correct.
### Why are the changes needed?
to support IPV6 deployment scenario
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT and IPV6 spark deployment with yarn
Closes#28931 from PavithraRamachandran/ipv6_issue.
Authored-by: Pavithraramachandran <pavi.rams@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR adds the check to see whether the executor is lost (by asking the `CoarseGrainedSchedulerBackend`) after timeout error raised in `BlockManagerMasterEndponit` due to removing blocks(e.g. RDD, broadcast, shuffle). If the executor is lost, we will ignore the error. Otherwise, throw the error.
### Why are the changes needed?
When removing blocks(e.g. RDD, broadcast, shuffle), `BlockManagerMaserEndpoint` will make RPC calls to each known `BlockManagerSlaveEndpoint` to remove the specific blocks. The PRC call sometimes could end in a timeout when the executor has been lost, but only notified the `BlockManagerMasterEndpoint` after the removing call has already happened. The timeout error could therefore fail the whole job.
In this case, we actually could just ignore the error since those blocks on the lost executor could be considered as removed already.
### Does this PR introduce _any_ user-facing change?
Yes. In case of users hits this issue, they will have the job executed successfully instead of throwing the exception.
### How was this patch tested?
Added unit tests.
Closes#28924 from Ngone51/ignore-timeout-error-for-inactive-executor.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR proposes to disallow to create `SparkContext` in executors, e.g., in UDFs.
### Why are the changes needed?
Currently executors can create SparkContext, but shouldn't be able to create it.
```scala
sc.range(0, 1).foreach { _ =>
new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
}
```
### Does this PR introduce _any_ user-facing change?
Yes, users won't be able to create `SparkContext` in executors.
### How was this patch tested?
Addes tests.
Closes#28986 from ueshin/issues/SPARK-32160/disallow_spark_context_in_executors.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This patch fixes the test failure due to the missing when statements for destination path. Note that it didn't fail on master branch, because 245aee9 got rid of size call in destination path, but still good to not depend on 245aee9.
### Why are the changes needed?
The build against branch-3.0 / branch-2.4 starts to fail after merging SPARK-32024 (#28859) and this patch will fix it.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Ran modified UT against master / branch-3.0 / branch-2.4.
Closes#29046 from HeartSaVioR/QUICKFIX-SPARK-32024.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Enable test("static relative links are prefixed with uiRoot (spark.ui.proxyBase)")
### Why are the changes needed?
In Jira, the failed test is another one test("ajax rendered relative links are prefixed with uiRoot (spark.ui.proxyBase)"). This test has been fixed in 6a895d0
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Fix UT
Closes#28970 from warrenzhu25/31723.
Authored-by: Warren Zhu <zhonzh@microsoft.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Update ApplicationStoreInfo.size to real size during HistoryServerDiskManager initializing.
### Why are the changes needed?
This PR is for fixing bug [32024](https://issues.apache.org/jira/browse/SPARK-32024). We found after history server restart, below error would randomly happen: "java.lang.IllegalStateException: Disk usage tracker went negative (now = -***, delta = -***)" from `HistoryServerDiskManager`.
![Capture](https://user-images.githubusercontent.com/10524738/85034468-fda4ae80-b136-11ea-9011-f0c3e6508002.JPG)
**Cause**: Reading data from level db would trigger table file compaction, which may also trigger size of level db directory changes. This size change may not be recorded in LevelDB (`ApplicationStoreInfo` in `listing`). When service restarts, `currentUsage` is calculated from real directory size, but `ApplicationStoreInfo` are loaded from leveldb, then `currentUsage` may be less then sum of `ApplicationStoreInfo.size`. In `makeRoom()` function, `ApplicationStoreInfo.size` is used to update usage. Then `currentUsage` becomes negative after several round of `release()` and `lease()` (`makeRoom()`).
**Reproduce**: we can reproduce this issue in dev environment by reducing config value of "spark.history.retainedApplications" and "spark.history.store.maxDiskUsage" to some small values. Here are steps: 1. start history server, load some applications and access some pages (maybe "stages" page to trigger leveldb compaction). 2. restart HS, and refresh pages.
I also added an UT to simulate this case in `HistoryServerDiskManagerSuite`.
**Benefit**: this change would help improve history server reliability.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add unit test and manually tested it.
Closes#28859 from zhli1142015/update-ApplicationStoreInfo.size-during-disk-manager-initialize.
Authored-by: Zhen Li <zhli@microsoft.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
Use the invalid value Int.MinValue to fill the map index when the event logs from the old Spark version.
### Why are the changes needed?
Follow up PR for #28941.
### Does this PR introduce _any_ user-facing change?
When we use the Spark version 3.0 history server reading the event log written by the old Spark version, we use the invalid value -2 to fill the map index.
### How was this patch tested?
Existing UT.
Closes#28965 from xuanyuanking/follow-up.
Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
Use Files.createDirectory() to create local directory instead of File.mkdir() in DiskBlockManager.
Many times, we will see such error log information like "Failed to create local dir in xxxxxx". But there is no clear information indicating why the directory creation failed.
When Files.createDirectory() fails to create a local directory, it can give specific error information for subsequent troubleshooting(also throws IOException).
### Why are the changes needed?
Throw clear error message when creating directory fails.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`DiskBlockManagerSuite`
Closes#28997 from sidedoorleftroad/SPARK-32172.
Authored-by: sidedoorleftroad <sidedoorleftroad@163.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR aims to reduce the required test resources in WorkerDecommissionExtendedSuite.
### Why are the changes needed?
When Jenkins farms is crowded, the following failure happens currently [here](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-3.2-hive-2.3/890/testReport/junit/org.apache.spark.scheduler/WorkerDecommissionExtendedSuite/Worker_decommission_and_executor_idle_timeout/)
```
java.util.concurrent.TimeoutException: Can't find 20 executors before 60000 milliseconds elapsed
at org.apache.spark.TestUtils$.waitUntilExecutorsUp(TestUtils.scala:326)
at org.apache.spark.scheduler.WorkerDecommissionExtendedSuite.$anonfun$new$2(WorkerDecommissionExtendedSuite.scala:45)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the Jenkins.
Closes#29001 from dongjoon-hyun/SPARK-32100-2.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR changes `webui.css` to fix a style issue on moving mouse cursor on the Spark logo.
### Why are the changes needed?
In the webui, the Spark logo is on the top right side.
When we move mouse cursor on the logo, a weird underline appears near the logo.
<img width="209" alt="logo_with_line" src="https://user-images.githubusercontent.com/4736016/86542828-3c6a9f00-bf54-11ea-9b9d-cc50c12c2c9b.png">
### Does this PR introduce _any_ user-facing change?
Yes. After this change applied, no more weird line shown even if mouse cursor moves on the logo.
<img width="207" alt="removed-line-from-logo" src="https://user-images.githubusercontent.com/4736016/86542877-98cdbe80-bf54-11ea-8695-ee39689673ab.png">
### How was this patch tested?
By moving mouse cursor on the Spark logo and confirmed no more weird line there.
Closes#29003 from sarutak/fix-logo-underline.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>