Commit graph

7746 commits

Author SHA1 Message Date
neko 1bfcb51eeb [SPARK-33132][WEBUI] Make formatBytes return 0.0 B for negative input instead of NaN
### What changes were proposed in this pull request?
when bytesRead metric was negative, `formatBytes` in `ui.js` should just return `0.0 B` to avoid `NaN Undefined` result.

### Why are the changes needed?
Strengthen the parameter validataion to improve metric display on Summary Metrics of Spark  Stage UI.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
It's a small change, just manual test.

Closes #30030 from akiyamaneko/formatBytes_NaN.

Authored-by: neko <echohlne@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-13 09:29:05 -07:00
Holden Karau 7696ca5673 [SPARK-32881][CORE] Catch some race condition errors and log them more clearly
### What changes were proposed in this pull request?

Decommissioning can run out of time resulting in some race condition, these race conditions result in confusing error messages but not negative impact.

### Why are the changes needed?

The NPE & element missing errors in the log can create a missunderstanding.

### Does this PR introduce _any_ user-facing change?
Logs change.

### How was this patch tested?
Existing tests pass.

Closes #29992 from holdenk/SPARK-32881-error-messaging-on-decom-race-messages.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-10 16:24:50 -07:00
yi.wu 0b326d5327 [SPARK-32857][CORE] Fix flaky o.a.s.s.BarrierTaskContextSuite.throw exception if the number of barrier() calls are not the same on every task
### What changes were proposed in this pull request?

Fix the flaky test.

### Why are the changes needed?

The test is flaky: `Expected exception org.apache.spark.SparkException to be thrown, but no exception was thrown`.

Check the full error stack [here](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128548/testReport/org.apache.spark.scheduler/BarrierTaskContextSuite/throw_exception_if_the_number_of_barrier___calls_are_not_the_same_on_every_task/).

By analyzing the log below, I found that task 0 hadn't reached the second `context.barrier()` when another three tasks already raised the sync timeout exceptions by the first `context.barrier()`. The timeout exceptions were caught by the `try...catch...`. Then, each task started another round barrier sync from the second `context.barrier()` and completed the sync successfully.

```scala
20/09/10 20:54:48.821 dispatcher-event-loop-10 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 0.
20/09/10 20:54:48.822 dispatcher-event-loop-10 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 0 (Attempt 0) received update from Task 2, current progress: 1/4.
20/09/10 20:54:48.826 dispatcher-BlockManagerMaster INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:38420 (size: 2.2 KiB, free: 546.3 MiB)
20/09/10 20:54:48.908 dispatcher-event-loop-12 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 0.
20/09/10 20:54:48.909 dispatcher-event-loop-12 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 0 (Attempt 0) received update from Task 1, current progress: 2/4.
20/09/10 20:54:48.959 dispatcher-event-loop-11 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 0.
20/09/10 20:54:48.960 dispatcher-event-loop-11 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 0 (Attempt 0) received update from Task 3, current progress: 3/4.
20/09/10 20:54:49.616 dispatcher-CoarseGrainedScheduler INFO TaskSchedulerImpl: Skip current round of resource offers for barrier stage 0 because the barrier taskSet requires 4 slots, while the total number of available slots is 0.
20/09/10 20:54:49.899 dispatcher-event-loop-15 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 0.
20/09/10 20:54:49.900 dispatcher-event-loop-15 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 0 (Attempt 0) received update from Task 1, current progress: 1/4.
20/09/10 20:54:49.965 dispatcher-event-loop-13 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 0.
20/09/10 20:54:49.966 dispatcher-event-loop-13 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 0 (Attempt 0) received update from Task 3, current progress: 2/4.
20/09/10 20:54:50.112 dispatcher-event-loop-16 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 0.
20/09/10 20:54:50.113 dispatcher-event-loop-16 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 0 (Attempt 0) received update from Task 0, current progress: 3/4.
20/09/10 20:54:50.609 dispatcher-CoarseGrainedScheduler INFO TaskSchedulerImpl: Skip current round of resource offers for barrier stage 0 because the barrier taskSet requires 4 slots, while the total number of available slots is 0.
20/09/10 20:54:50.826 dispatcher-event-loop-17 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 0.
20/09/10 20:54:50.827 dispatcher-event-loop-17 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 0 (Attempt 0) received update from Task 2, current progress: 4/4.
20/09/10 20:54:50.827 dispatcher-event-loop-17 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 0 (Attempt 0) received all updates from tasks, finished successfully.
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Updated the test and tested a hundred times without failure(Previously, there could be several failures).

Closes #29732 from Ngone51/fix-flaky-throw-exception.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-06 14:18:37 -07:00
Michael Munday b5e4b8c73e [SPARK-27428][CORE][TEST] Increase receive buffer size used in StatsdSinkSuite
### What changes were proposed in this pull request?

Increase size of socket receive buffer in these tests.

### Why are the changes needed?

The socket receive buffer size set in this test was too small for
the StatsdSinkSuite tests to run reliably on some systems. For a
test in this suite to run reliably the buffer needs to be large
enough to hold all the data in the packets being sent in a test
along with any additional kernel or protocol overhead. The amount
of kernel overhead per packet can vary from system to system but is
typically far higher than the protocol overhead.

If the receive buffer is too small and fills up then packets are
silently dropped. This leads to the test failing with a timeout.

If the socket defaults to a larger receive buffer (normally true)
then we should keep that size.

As well as increasing the minimum buffer size I've also decoupled
the datagram packet buffer size from the receive buffer size. The
receive buffer should in general be far larger to account for the
fact that multiple packets might be buffered, as well as the
aforementioned overhead. Any truncated data in individual packets
will be picked up by the tests.

### Does this PR introduce _any_ user-facing change?

No, this only affects the tests.

### How was this patch tested?
Existing tests on IBM Z and x86.

Closes #29819 from mundaym/fix-statsd.

Authored-by: Michael Munday <mike.munday@ibm.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-10-06 08:31:06 -05:00
Holden Karau db420f79cc [SPARK-33049][CORE] Decommission shuffle block test is flaky
### What changes were proposed in this pull request?

Increase the listener bus event length, syncrhonize the addition of blocks modified to the array list.

### Why are the changes needed?

This test appears flaky in Jenkins (can not repro locally). Given that the index file made it through and the index file is only transferred after the data file, the only two reasons I could come up with an interminentent failure here are with the listenerbus dropping a message or the two block change messages being received at the same time.

### Does this PR introduce _any_ user-facing change?

No (test only).

### How was this patch tested?

The tests still pass on my machine but they did before. We'll need to run it through jenkins a few times first.

Closes #29929 from holdenk/fix-.BlockManagerDecommissionIntegrationSuite.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-03 15:14:48 -07:00
Bo Yang 1299c8a81d [SPARK-33037][SHUFFLE] Remove knownManagers to support user's custom shuffle manager plugin
### What changes were proposed in this pull request?

Spark has a hardcode list to contain known shuffle managers, which has two values now. It does not contain user's custom shuffle manager which is set through Spark config "spark.shuffle.manager".

We hit issue when set "spark.shuffle.manager" with our own shuffle manager plugin (Uber Remote Shuffle Service implementation, https://github.com/uber/RemoteShuffleService). Other users will hit same issue when they implement their own shuffle manager.

It is better to remove that knownManagers hardcode list, to support user's custom shuffle manager implementation.

### Why are the changes needed?

Spark has shuffle manager API to support custom shuffle manager implementation. The hardcoded known managers list does not consider that shuffle manager config value which could be set by user. Thus better to remove this hardcoded known managers list.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Current Spark unit test already covers the code path.

Closes #29916 from boy-uber/knownManagers.

Lead-authored-by: Bo Yang <boy@uber.com>
Co-authored-by: Bo Yang <boy-uber@users.noreply.github.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2020-10-02 20:26:46 -07:00
Gabor Somogyi 991f7e81d4 [SPARK-32001][SQL] Create JDBC authentication provider developer API
### What changes were proposed in this pull request?
At the moment only the baked in JDBC connection providers can be used but there is a need to support additional databases and use-cases. In this PR I'm proposing a new developer API name `JdbcConnectionProvider`. To show how an external JDBC connection provider can be implemented I've created an example [here](https://github.com/gaborgsomogyi/spark-jdbc-connection-provider).

The PR contains the following changes:
* Added connection provider developer API
* Made JDBC connection providers constructor to noarg => needed to load them w/ service loader
* Connection providers are now loaded w/ service loader
* Added tests to load providers independently
* Moved `SecurityConfigurationLock` into a central place because other areas will change global JVM security config

### Why are the changes needed?
No custom authentication possibility.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
* Existing + additional unit tests
* Docker integration tests
* Tested manually the newly created external JDBC connection provider

Closes #29024 from gaborgsomogyi/SPARK-32001.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-02 13:04:40 +09:00
Shruti Gumma 8657742ec7 [SPARK-32996][WEB-UI][FOLLOWUP] Move ExecutorSummarySuite to proper path
### What changes were proposed in this pull request?

This  change updates the test file location in #29872 to proper path.

### Why are the changes needed?

ExecutorSummarySuite.scala should be in core/src/test/scala instead of core/src/test/java.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit tests

Closes #29926 from shrutig/SPARK-32996.

Authored-by: Shruti Gumma <shruti_gumma@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2020-10-01 16:33:19 -07:00
Peter Toth 28ed3a512a [SPARK-32723][WEBUI] Upgrade to jQuery 3.5.1
### What changes were proposed in this pull request?
Upgrade to the latest available version of jQuery (3.5.1).

### Why are the changes needed?
There are some CVE-s reported (CVE-2020-11022, CVE-2020-11023) affecting older versions of jQuery. Although Spark UI is read-only and those CVEs doesn't seem to affect Spark, using the latest version of this library can help to handle vulnerability reports of security scans.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Manual tests and checked the jQuery 3.5 upgrade guide.

Closes #29902 from peter-toth/SPARK-32723-upgrade-to-jquery-3.5.1.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-09-30 21:30:17 -07:00
angerszhu 0b5a379c1f [SPARK-33023][CORE] Judge path of Windows need add condition Utils.isWindows
### What changes were proposed in this pull request?
according to  https://github.com/apache/spark/pull/29881#discussion_r496648397
we need add condition `Utils.isWindows`

### Why are the changes needed?
add strict condition of judging path is window path

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
No

Closes #29909 from AngersZhuuuu/SPARK-33023.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-09-30 19:24:50 -07:00
Dongjoon Hyun cc06266ade [SPARK-33019][CORE] Use spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=1 by default
### What changes were proposed in this pull request?

Apache Spark 3.1's default Hadoop profile is `hadoop-3.2`. Instead of having a warning documentation, this PR aims to use a consistent and safer version of Apache Hadoop file output committer algorithm which is `v1`. This will prevent a silent correctness regression during migration from Apache Spark 2.4/3.0 to Apache Spark 3.1.0. Of course, if there is a user-provided configuration, `spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2`, that will be used still.

### Why are the changes needed?

Apache Spark provides multiple distributions with Hadoop 2.7 and Hadoop 3.2. `spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version` depends on the Hadoop version. Apache Hadoop 3.0 switches the default algorithm from `v1` to `v2` and now there exists a discussion to remove `v2`. We had better provide a consistent default behavior of `v1` across various Spark distributions.

- [MAPREDUCE-7282](https://issues.apache.org/jira/browse/MAPREDUCE-7282) MR v2 commit algorithm should be deprecated and not the default

### Does this PR introduce _any_ user-facing change?

Yes. This changes the default behavior. Users can override this conf.

### How was this patch tested?

Manual.

**BEFORE (spark-3.0.1-bin-hadoop3.2)**
```scala
scala> sc.version
res0: String = 3.0.1

scala> sc.hadoopConfiguration.get("mapreduce.fileoutputcommitter.algorithm.version")
res1: String = 2
```

**AFTER**
```scala
scala> sc.hadoopConfiguration.get("mapreduce.fileoutputcommitter.algorithm.version")
res0: String = 1
```

Closes #29895 from dongjoon-hyun/SPARK-DEFAUT-COMMITTER.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-09-29 12:02:45 -07:00
Akshat Bordia 7766fd13c9 [MINOR][DOCS] Fixing log message for better clarity
Fixing log message for better clarity.

Closes #29870 from akshatb1/master.

Lead-authored-by: Akshat Bordia <akshat.bordia31@gmail.com>
Co-authored-by: Akshat Bordia <akshat.bordia@citrix.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-09-29 08:38:43 -05:00
Tom van Bussel f167002522 [SPARK-32901][CORE] Do not allocate memory while spilling UnsafeExternalSorter
### What changes were proposed in this pull request?

This PR changes `UnsafeExternalSorter` to no longer allocate any memory while spilling. In particular it removes the allocation of a new pointer array in `UnsafeInMemorySorter`. Instead the new pointer array is allocated whenever the next record is inserted into the sorter.

### Why are the changes needed?

Without this change the `UnsafeExternalSorter` could throw an OOM while spilling. The following sequence of events would have triggered an OOM:

1. `UnsafeExternalSorter` runs out of space in its pointer array and attempts to allocate a new large array to replace the old one.
2. `TaskMemoryManager` tries to allocate the memory backing the new large array using `MemoryManager`, but `MemoryManager` is only willing to return most but not all of the memory requested.
3. `TaskMemoryManager` asks `UnsafeExternalSorter` to spill, which causes `UnsafeExternalSorter` to spill the current run to disk, to free its record pages and to reset its `UnsafeInMemorySorter`.
4. `UnsafeInMemorySorter` frees the old pointer array, and tries to allocate a new small pointer array.
5. `TaskMemoryManager` tries to allocate the memory backing the small array using `MemoryManager`, but `MemoryManager` is unwilling to give it any memory, as the `TaskMemoryManager` is still holding on to the memory it got for the new large array.
6. `TaskMemoryManager` again asks `UnsafeExternalSorter` to spill, but this time there is nothing to spill.
7. `UnsafeInMemorySorter` receives less memory than it requested, and causes a `SparkOutOfMemoryError` to be thrown, which causes the current task to fail.

With the changes in the PR the following will happen instead:

1. `UnsafeExternalSorter` runs out of space in its pointer array and attempts to allocate a new large array to replace the old one.
2. `TaskMemoryManager` tries to allocate the memory backing the new large array using `MemoryManager`, but `MemoryManager` is only willing to return most but not all of the memory requested.
3. `TaskMemoryManager` asks `UnsafeExternalSorter` to spill, which causes `UnsafeExternalSorter` to spill the current run to disk, to free its record pages and to reset its `UnsafeInMemorySorter`.
4. `UnsafeInMemorySorter` frees the old pointer array.
5. `TaskMemoryManager` returns control to `UnsafeExternalSorter.growPointerArrayIfNecessary` (either by returning the the new large array or by throwing a `SparkOutOfMemoryError`).
6. `UnsafeExternalSorter` either frees the new large array or it ignores the `SparkOutOfMemoryError` depending on what happened in the previous step.
7. `UnsafeExternalSorter` successfully allocates a new small pointer array and operation continues as normal.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Tests were added in `UnsafeExternalSorterSuite` and `UnsafeInMemorySorterSuite`.

Closes #29785 from tomvanbussel/SPARK-32901.

Authored-by: Tom van Bussel <tom.vanbussel@databricks.com>
Signed-off-by: herman <herman@databricks.com>
2020-09-29 13:05:33 +02:00
Shruti Gumma 173da5bf11 [SPARK-32996][WEB-UI] Handle empty ExecutorMetrics in ExecutorMetricsJsonSerializer
### What changes were proposed in this pull request?
When `peakMemoryMetrics` in `ExecutorSummary` is `Option.empty`, then the `ExecutorMetricsJsonSerializer#serialize` method does not execute the `jsonGenerator.writeObject` method. This causes the json to be generated with `peakMemoryMetrics` key added to the serialized string, but no corresponding value.
This causes an error to be thrown when it is the next key `attributes` turn to be added to the json:
`com.fasterxml.jackson.core.JsonGenerationException: Can not write a field name, expecting a value
`

### Why are the changes needed?
At the start of the Spark job, if `peakMemoryMetrics` is `Option.empty`, then it causes
a `com.fasterxml.jackson.core.JsonGenerationException` to be thrown when we navigate to the Executors tab in Spark UI.
Complete stacktrace:

> com.fasterxml.jackson.core.JsonGenerationException: Can not write a field name, expecting a value
> 	at com.fasterxml.jackson.core.JsonGenerator._reportError(JsonGenerator.java:2080)
> 	at com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.writeFieldName(WriterBasedJsonGenerator.java:161)
> 	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:725)
> 	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:721)
> 	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
> 	at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serializeContents(CollectionSerializer.java:145)
> 	at com.fasterxml.jackson.module.scala.ser.IterableSerializer.serializeContents(IterableSerializerModule.scala:26)
> 	at com.fasterxml.jackson.module.scala.ser.IterableSerializer.serializeContents$(IterableSerializerModule.scala:25)
> 	at com.fasterxml.jackson.module.scala.ser.UnresolvedIterableSerializer.serializeContents(IterableSerializerModule.scala:54)
> 	at com.fasterxml.jackson.module.scala.ser.UnresolvedIterableSerializer.serializeContents(IterableSerializerModule.scala:54)
> 	at com.fasterxml.jackson.databind.ser.std.AsArraySerializerBase.serialize(AsArraySerializerBase.java:250)
> 	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
> 	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
> 	at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:4094)
> 	at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:3404)
> 	at org.apache.spark.ui.exec.ExecutorsPage.allExecutorsDataScript$1(ExecutorsTab.scala:64)
> 	at org.apache.spark.ui.exec.ExecutorsPage.render(ExecutorsTab.scala:76)
> 	at org.apache.spark.ui.WebUI.$anonfun$attachPage$1(WebUI.scala:89)
> 	at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:80)
> 	at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
> 	at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
> 	at org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:873)
> 	at org.sparkproject.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1623)
> 	at org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95)
> 	at org.sparkproject.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
> 	at org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:540)
> 	at org.sparkproject.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
> 	at org.sparkproject.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1345)
> 	at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
> 	at org.sparkproject.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480)
> 	at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
> 	at org.sparkproject.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247)
> 	at org.sparkproject.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
> 	at org.sparkproject.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:753)
> 	at org.sparkproject.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220)
> 	at org.sparkproject.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
> 	at org.sparkproject.jetty.server.Server.handle(Server.java:505)
> 	at org.sparkproject.jetty.server.HttpChannel.handle(HttpChannel.java:370)
> 	at org.sparkproject.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
> 	at org.sparkproject.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
> 	at org.sparkproject.jetty.io.FillInterest.fillable(FillInterest.java:103)
> 	at org.sparkproject.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
> 	at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
> 	at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
> 	at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
> 	at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
> 	at org.sparkproject.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
> 	at org.sparkproject.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:698)
> 	at org.sparkproject.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:804)
> 	at java.base/java.lang.Thread.run(Thread.java:834)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

Closes #29872 from shrutig/SPARK-32996.

Authored-by: Shruti Gumma <shruti_gumma@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2020-09-28 10:07:36 -07:00
Chao Sun 8ccfbc114e [SPARK-32381][CORE][SQL] Move and refactor parallel listing & non-location sensitive listing to core
<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
  7. If you want to add a new configuration, please read the guideline first for naming configurations in
     'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->

This moves and refactors the parallel listing utilities from `InMemoryFileIndex` to Spark core so it can be reused by modules beside SQL. Along the process this also did some cleanups/refactorings:

- Created a `HadoopFSUtils` class under core
- Moved `InMemoryFileIndex.bulkListLeafFiles` into `HadoopFSUtils.parallelListLeafFiles`. It now depends on a `SparkContext` instead of `SparkSession` in SQL. Also added a few parameters which used to be read from `SparkSession.conf`: `ignoreMissingFiles`, `ignoreLocality`, `parallelismThreshold`, `parallelismMax ` and `filterFun` (for additional filtering support but we may be able to merge this with `filter` parameter in future).
- Moved `InMemoryFileIndex.listLeafFiles` into `HadoopFSUtils.listLeafFiles` with similar changes above.

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->

Currently the locality-aware parallel listing mechanism only applies to `InMemoryFileIndex`. By moving this to core, we can potentially reuse the same mechanism for other code paths as well.

### Does this PR introduce _any_ user-facing change?
<!--
Note that it means *any* user-facing change including all aspects such as the documentation fix.
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
If no, write 'No'.
-->

No.

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->

Since this is mostly a refactoring, it relies on existing unit tests such as those for `InMemoryFileIndex`.

Closes #29471 from sunchao/SPARK-32381.

Lead-authored-by: Chao Sun <sunchao@apache.org>
Co-authored-by: Holden Karau <hkarau@apple.com>
Co-authored-by: Chao Sun <sunchao@uber.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
2020-09-24 10:58:52 -07:00
Holden Karau 27f6b5a103 [SPARK-32937][SPARK-32980][K8S] Fix decom & launcher tests and add some comments to reduce chance of breakage
### What changes were proposed in this pull request?

Fixes the log strings the decom integration tests looks for and add comments reminding people to run the K8s integration tests when changing those code paths.

### Why are the changes needed?

The strings it looks for have been changed.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

WIP: Verify that the K8s jenkins job succeeds

Closes #29854 from holdenk/SPARK-32979-spark-k8s-decom-test-is-broken.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-09-23 15:39:31 -07:00
Zhen Li d01594e8d1 [SPARK-32886][WEBUI] fix 'undefined' link in event timeline view
### What changes were proposed in this pull request?

Fix ".../jobs/undefined" link from "Event Timeline" in jobs page. Job page link in "Event Timeline" view is constructed by fetching job page link defined in job list below. when job count exceeds page size of job table, only links of jobs in job table can be fetched from page. Other jobs' link would be 'undefined', and links of them in "Event Timeline" are broken, they are redirected to some wired URL like ".../jobs/undefined". This PR is fixing this wrong link issue. With this PR, job link in "Event Timeline" view would always redirect to correct job page.

### Why are the changes needed?

Wrong link (".../jobs/undefined") in "Event Timeline" of jobs page. for example, the first job in below page is not in table below, as job count(116) exceeds page size(100). When clicking it's item in "Event Timeline", page is redirected to ".../jobs/undefined", which is wrong. Links in "Event Timeline" should always be correct.
![undefinedlink](https://user-images.githubusercontent.com/10524738/93184779-83fa6d80-f6f1-11ea-8a80-1a304ca9cbb2.JPG)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually tested.

Closes #29757 from zhli1142015/fix-link-event-timeline-view.

Authored-by: Zhen Li <zhli@microsoft.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-09-21 09:05:40 -05:00
Wenchen Fan 0c66813ad9 Revert "[SPARK-32850][CORE] Simplify the RPC message flow of decommission"
This reverts commit 56ae95053d.
2020-09-21 13:28:31 +08:00
yi.wu f1dc479d39 [SPARK-32898][CORE] Fix wrong executorRunTime when task killed before real start
### What changes were proposed in this pull request?

Only calculate the executorRunTime when taskStartTimeNs > 0. Otherwise, set executorRunTime to 0.

### Why are the changes needed?

bug fix.

It's possible that a task be killed (e.g., by another successful attempt) before it reaches "taskStartTimeNs = System.nanoTime()". In this case, taskStartTimeNs is still 0 since it hasn't been really initialized. And we will get the wrong executorRunTime by calculating System.nanoTime() - taskStartTimeNs.

### Does this PR introduce _any_ user-facing change?

Yes, users will see the correct executorRunTime.

### How was this patch tested?

Pass existing tests.

Closes #29789 from Ngone51/fix-SPARK-32898.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-09-18 14:02:14 -07:00
Tom van Bussel 105225ddbc [SPARK-32911][CORE] Free memory in UnsafeExternalSorter.SpillableIterator.spill() when all records have been read
### What changes were proposed in this pull request?

This PR changes `UnsafeExternalSorter.SpillableIterator` to free its memory (except for the page holding the last record) if it is forced to spill after all of its records have been read. It also makes sure that `lastPage` is freed if `loadNext` is never called the again. The latter was necessary to get my test case to succeed (otherwise it would complain about a leak).

### Why are the changes needed?

No memory is freed after calling `UnsafeExternalSorter.SpillableIterator.spill()` when all records have been read, even though it is still holding onto some memory. This may cause a `SparkOutOfMemoryError` to be thrown, even though we could have just freed the memory instead.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

A test was added to `UnsafeExternalSorterSuite`.

Closes #29787 from tomvanbussel/SPARK-32911.

Authored-by: Tom van Bussel <tom.vanbussel@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-18 11:49:26 +00:00
William Hyun 7892887981 [SPARK-32930][CORE] Replace deprecated isFile/isDirectory methods
### What changes were proposed in this pull request?

This PR aims to replace deprecated `isFile` and `isDirectory` methods.

```diff
- fs.isDirectory(hadoopPath)
+ fs.getFileStatus(hadoopPath).isDirectory
```

```diff
- fs.isFile(new Path(inProgressLog))
+ fs.getFileStatus(new Path(inProgressLog)).isFile
```

### Why are the changes needed?

It shows deprecation warnings.

- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-3.2-hive-2.3/1244/consoleFull

```
[warn] /home/jenkins/workspace/spark-master-test-sbt-hadoop-3.2-hive-2.3/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala:815: method isFile in class FileSystem is deprecated: see corresponding Javadoc for more information.
[warn]             if (!fs.isFile(new Path(inProgressLog))) {
```

```
[warn] /home/jenkins/workspace/spark-master-test-sbt-hadoop-3.2-hive-2.3/core/src/main/scala/org/apache/spark/SparkContext.scala:1884: method isDirectory in class FileSystem is deprecated: see corresponding Javadoc for more information.
[warn]           if (fs.isDirectory(hadoopPath)) {
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the Jenkins.

Closes #29796 from williamhyun/filesystem.

Authored-by: William Hyun <williamhyun3@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-18 18:13:11 +09:00
yi.wu a54a6a0113 [SPARK-32287][CORE] Fix flaky o.a.s.ExecutorAllocationManagerSuite on GithubActions
### What changes were proposed in this pull request?

To fix the flaky `ExecutorAllocationManagerSuite`: Avoid first `schedule()` invocation after `ExecutorAllocationManager` started.

### Why are the changes needed?

`ExecutorAllocationManagerSuite` is still flaky, see:

https://github.com/apache/spark/pull/29722/checks?check_run_id=1117979237

By checking the below logs, we can see that there's a race condition between thread `pool-1-thread-1-ScalaTest-running` and thread `spark-dynamic-executor-allocation`.  The only possibility of thread `spark-dynamic-executor-allocation` becoming active is the first time invocation of `schedule()`(since the `TEST_SCHEDULE_INTERVAL`(30s) is really long, so it's impossible the second invocation would happen).  Thus, I think we shall avoid the first invocation too.

```scala
20/09/15 12:41:20.831 pool-1-thread-1-ScalaTest-running-ExecutorAllocationManagerSuite INFO ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 2 for resource profile id: 0)
20/09/15 12:41:20.832 spark-dynamic-executor-allocation INFO ExecutorAllocationManager: Requesting 2 new executors because tasks are backlogged (new desired total will be 4 for resource profile id: 0)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The flaky can't be reproduced locally so it's hard to say it has been completely fixed by now. We need time to see the result.

Closes #29773 from Ngone51/fix-SPARK-32287.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-17 11:20:50 +00:00
Tom van Bussel e5e54a3614 [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls
### What changes were proposed in this pull request?

This PR changes the way `UnsafeExternalSorter.SpillableIterator` checks whether it has spilled already, by checking whether `inMemSorter` is null. It also allows it to spill other `UnsafeSorterIterator`s than `UnsafeInMemorySorter.SortedIterator`.

### Why are the changes needed?

Before this PR `UnsafeExternalSorter.SpillableIterator` could not spill when there are NULLs in the input and radix sorting is used. Currently, Spark determines whether UnsafeExternalSorter.SpillableIterator has not spilled yet by checking whether `upstream` is an instance of `UnsafeInMemorySorter.SortedIterator`. When radix sorting is used and there are NULLs in the input however, `upstream` will be an instance of `UnsafeExternalSorter.ChainedIterator` instead, and Spark will assume that the `SpillableIterator` iterator has spilled already, and therefore cannot spill again when it's supposed to spill.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

A test was added to `UnsafeExternalSorterSuite` (and therefore also to `UnsafeExternalSorterRadixSortSuite`). I manually confirmed that the test failed in `UnsafeExternalSorterRadixSortSuite` without this patch.

Closes #29772 from tomvanbussel/SPARK-32900.

Authored-by: Tom van Bussel <tom.vanbussel@databricks.com>
Signed-off-by: herman <herman@databricks.com>
2020-09-17 12:35:40 +02:00
yi.wu 56ae95053d [SPARK-32850][CORE] Simplify the RPC message flow of decommission
### What changes were proposed in this pull request?

This PR cleans up the RPC message flow among the multiple decommission use cases, it includes changes:

* Keep `Worker`'s decommission status be consistent between the case where decommission starts from `Worker` and the case where decommission starts from the `MasterWebUI`: sending `DecommissionWorker` from `Master` to `Worker` in the latter case.

* Change from two-way communication to one-way communication when notifying decommission between driver and executor: it's obviously unnecessary for the executor to acknowledge the decommission status to the driver since the decommission request is from the driver. And it's same in reverse.

* Only send one message instead of two(`DecommissionSelf`/`DecommissionBlockManager`) when decommission the executor: executor and `BlockManager` are in the same JVM.

* Clean up codes around here.

### Why are the changes needed?

Before:

<img width="1948" alt="WeChat56c00cc34d9785a67a544dca036d49da" src="https://user-images.githubusercontent.com/16397174/92850308-dc461c80-f41e-11ea-8ac0-287825f4e0c4.png">

After:
<img width="1968" alt="WeChat05f7afb017e3f0132394c5e54245e49e" src="https://user-images.githubusercontent.com/16397174/93189571-de88dd80-f774-11ea-9300-1943920aa27d.png">

(Note the diagrams only counts those RPC calls that needed to go through the network. Local RPC calls are not counted here.)

After this change, We reduced 6 original RPC calls and added one more RPC call for keeping the consistent decommission status for the Worker. And the RPC flow becomes more clear.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Updated existing tests.

Closes #29722 from Ngone51/simplify-decommission-rpc.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-16 15:00:31 +00:00
Zhenhua Wang 99384d1e83 [SPARK-32738][CORE] Should reduce the number of active threads if fatal error happens in Inbox.process
### What changes were proposed in this pull request?

Processing for `ThreadSafeRpcEndpoint` is controlled by  `numActiveThreads` in `Inbox`. Now if any fatal error happens during `Inbox.process`, `numActiveThreads` is not reduced. Then other threads can not process messages in that inbox, which causes the endpoint to "hang". For other type of endpoints, we also should keep  `numActiveThreads` correct.

This problem is more serious in previous Spark 2.x versions since the driver, executor and block manager endpoints are all thread safe endpoints.

To fix this, we should reduce the number of active threads if fatal error happens in `Inbox.process`.

### Why are the changes needed?

`numActiveThreads` is not correct when fatal error happens and will cause the described problem.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Add a new test.

Closes #29580 from wzhfy/deal_with_fatal_error.

Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-15 06:46:17 +00:00
yi.wu 0811666ab1 [SPARK-32878][CORE] Avoid scheduling TaskSetManager which has no pending tasks
### What changes were proposed in this pull request?

This PR proposes to avoid scheduling the (non-zombie) TaskSetManager which has no pending tasks.

### Why are the changes needed?

Currently, Spark always tries to schedule a (non-zombie) TaskSetManager even if it has no pending tasks. This causes notable problems for the barrier TaskSetManager: 1. `calculateAvailableSlots` can be called for multiple times for a launched barrier TaskSetManager; 2. user would see "Skip current round of resource offers for barrier stage" log message for
a launched barrier TaskSetManager all the time until the barrier TaskSetManager finishes, which is quite confused.

Besides, scheduling a TaskSetManager always involves many function invocations even if there're no pending tasks.

Therefore, I think we can skip those un-schedulable TasksetManagers to avoid the potential overhead.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass existing tests.

Closes #29750 from Ngone51/filter-out-unschedulable-stage.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-09-14 21:15:06 -07:00
LantaoJin 7a9b066c66 [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast
### What changes were proposed in this pull request?
In TorrentBroadcast.scala
```scala
L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false))
L137: TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true))
```
After the original value is saved successfully(TorrentBroadcast.scala: L133), but the following `blockifyObject()`(L137) or store piece(L147) steps are failed. There is no opportunity to release broadcast from memory.

This patch is to remove all pieces of the broadcast when failed to blockify or failed to store some pieces of a broadcast.

### Why are the changes needed?
We use Spark thrift-server as a long-running service. A bad query submitted a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the bad query but we found the driver's memory usage was still high and full GCs were still frequent. By investigating with GC dump and log, we found the broadcast may memory leak.

> 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure)
2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc):
116G->112G(170G), 184.9121920 secs]
[Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)]
1: 676531691 72035438432 [B
2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow
3: 99551 12018117568 [Ljava.lang.Object;
4: 26570 4349629040 [I
5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow;
6: 1708819 256299456 [C
7: 2338 179615208 [J
8: 1703669 54517408 java.lang.String
9: 103860 34896960 org.apache.spark.status.TaskDataWrapper
10: 177396 25545024 java.net.URI
...

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually test. This UT is hard to write and the patch is straightforward.

Closes #29558 from LantaoJin/SPARK-32715.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-09-14 18:24:52 -07:00
Ankur Dave 72550c3be7 [SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold
### What changes were proposed in this pull request?

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has typically consumed all available memory, so the allocation of the new pointer array is likely to fail.

This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them.

### Why are the changes needed?

Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
    TestMemoryManager memoryManager2 =
        new TestMemoryManager(
            new SparkConf()
            .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
            .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L)
            .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
            .set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
    TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0);
    final long pageSizeBytes = 8000000 + 8; // 8 bytes for end-of-page marker
    final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes);

    try {
      // Insert keys into the map until it stops accepting new keys.
      for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
        if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i);
        final long[] value = new long[]{i};
        BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8);
        Assert.assertFalse(loc.isDefined());
        boolean success =
            loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8);
        if (!success) break;
      }

      // The map should grow to its max capacity.
      long capacity = map.getArray().size() / 2;
      Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

      // The map should stop accepting new keys once it has reached its growth
      // threshold, which is half the max capacity.
      Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

      map.free();
    } finally {
      map.free();
    }
  }
}
```

Closes #29744 from ankurdave/SPARK-32872.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-09-14 13:58:15 -07:00
gengjiaan e558b8a0fd [SPARK-31847][CORE][TESTS] DAGSchedulerSuite: Rewrite the test framework to support apply specified spark configurations
### What changes were proposed in this pull request?
`DAGSchedulerSuite` exists some issue:
`afterEach` and `init` are called when the `SparkConf` of the default `SparkContext` has no configuration that the test case must set. This causes the `SparkContext` initialized in `beforeEach` to be discarded without being used, resulting in waste. On the other hand, the flexibility to add configurations to `SparkConf` should be addressed by the test framework.

Test suites inherits `LocalSparkContext` can be simplified.

### Why are the changes needed?
Reduce overhead about init `SparkContext`.
Rewrite the test framework to support apply specified spark configurations.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
Jenkins test.

Closes #29228 from beliefer/extend-test-frame-for-dag.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: beliefer <beliefer@163.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-14 11:57:29 +09:00
yangjie01 513d51a2c5 [SPARK-32808][SQL] Fix some test cases of sql/core module in scala 2.13
### What changes were proposed in this pull request?
The purpose of this pr is to partial resolve [SPARK-32808](https://issues.apache.org/jira/browse/SPARK-32808), total of 26 failed test cases were fixed, the related suite as follow:

- `StreamingAggregationSuite` related test cases (2 FAILED -> Pass)

- `GeneratorFunctionSuite` related test cases (2 FAILED -> Pass)

- `UDFSuite` related test cases (2 FAILED -> Pass)

- `SQLQueryTestSuite` related test cases (5 FAILED -> Pass)

- `WholeStageCodegenSuite` related test cases (1 FAILED -> Pass)

- `DataFrameSuite` related test cases (3 FAILED -> Pass)

- `OrcV1QuerySuite\OrcV2QuerySuite` related test cases (4 FAILED -> Pass)

- `ExpressionsSchemaSuite` related test cases (1 FAILED -> Pass)

- `DataFrameStatSuite` related test cases (1 FAILED -> Pass)

- `JsonV1Suite\JsonV2Suite\JsonLegacyTimeParserSuite` related test cases (6 FAILED -> Pass)

The main change of this pr as following:

- Fix Scala 2.13 compilation problems in   `ShuffleBlockFetcherIterator`  and `Analyzer`

- Specified `Seq` to `scala.collection.Seq` in `objects.scala` and `GenericArrayData` because internal use `Seq` maybe `mutable.ArraySeq` and not easy to call `.toSeq`

- Should specified `Seq` to `scala.collection.Seq`  when we call `Row.getAs[Seq]` and `Row.get(i).asInstanceOf[Seq]` because the data maybe `mutable.ArraySeq` but `Seq` is `immutable.Seq` in Scala 2.13

- Use a compatible way to let `+` and `-` method  of `Decimal` having the same behavior in Scala 2.12 and Scala 2.13

- Call `toList` in `RelationalGroupedDataset.toDF` method when `groupingExprs` is `Stream` type because `Stream` can't serialize in Scala 2.13

- Add a manual sort to `classFunsMap` in `ExpressionsSchemaSuite` because `Iterable.groupBy` in Scala 2.13 has different result with `TraversableLike.groupBy`  in Scala 2.12

### Why are the changes needed?
We need to support a Scala 2.13 build.

### Does this PR introduce _any_ user-facing change?

Should specified `Seq` to `scala.collection.Seq`  when we call `Row.getAs[Seq]` and `Row.get(i).asInstanceOf[Seq]` because the data maybe `mutable.ArraySeq` but the `Seq` is `immutable.Seq` in Scala 2.13

### How was this patch tested?

- Scala 2.12: Pass the Jenkins or GitHub Action

- Scala 2.13: Do the following:

```
dev/change-scala-version.sh 2.13
mvn clean install -DskipTests  -pl sql/core -Pscala-2.13 -am
mvn test -pl sql/core -Pscala-2.13
```

**Before**
```
Tests: succeeded 8166, failed 319, canceled 1, ignored 52, pending 0
*** 319 TESTS FAILED ***

```

**After**

```
Tests: succeeded 8204, failed 286, canceled 1, ignored 52, pending 0
*** 286 TESTS FAILED ***

```

Closes #29660 from LuciferYang/SPARK-32808.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-09-09 08:53:44 -05:00
Thomas Graves 514bf563a7 [SPARK-32823][WEB UI] Fix the master ui resources reporting
### What changes were proposed in this pull request?

Fixes the master UI for properly summing the resources total across multiple workers.
field:
Resources in use: 0 / 8 gpu

The bug here is that it was creating MutableResourceInfo and then reducing using the + operator.  the + operator in MutableResourceInfo simple adds the address from one to the addresses of the other.  But its using a HashSet so if the addresses are the same then you lose the correct amount.  ie worker1 has gpu addresses 0,1,2,3 and worker2 has addresses 0,1,2,3 then you only see 4 total GPUs when there are 8.

In this case we don't really need to create the MutableResourceInfo at all because we just want the sums for used and total so just remove the use of it.  The other uses of it are per Worker so those should be ok.

### Why are the changes needed?

fix UI

### Does this PR introduce _any_ user-facing change?

UI

### How was this patch tested?

tested manually on standalone cluster with multiple workers and multiple GPUs and multiple fpgas

Closes #29683 from tgravescs/SPARK-32823.

Lead-authored-by: Thomas Graves <tgraves@nvidia.com>
Co-authored-by: Thomas Graves <tgraves@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-09 10:33:21 +09:00
Thomas Graves e8634d8f6f [SPARK-32824][CORE] Improve the error message when the user forgets the .amount in a resource config
### What changes were proposed in this pull request?

If the user forgets to specify .amount on a resource config like spark.executor.resource.gpu, the error message thrown is very confusing:

```
ERROR SparkContext: Error initializing SparkContext.java.lang.StringIndexOutOfBoundsException: String index out of range:
-1 at java.lang.String.substring(String.java:1967) at
```

This makes it so we have a readable error thrown

### Why are the changes needed?

confusing error for users
### Does this PR introduce _any_ user-facing change?

just error message

### How was this patch tested?

Tested manually on standalone cluster

Closes #29685 from tgravescs/SPARK-32824.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-09 10:28:40 +09:00
yi.wu 125cbe3ae0 [SPARK-32736][CORE] Avoid caching the removed decommissioned executors in TaskSchedulerImpl
### What changes were proposed in this pull request?

The motivation of this PR is to avoid caching the removed decommissioned executors in `TaskSchedulerImpl`. The cache is introduced in https://github.com/apache/spark/pull/29422. The cache will hold the `isHostDecommissioned` info for a while. So if the task `FetchFailure` event comes after the executor loss event, `DAGScheduler` can still get the `isHostDecommissioned` from the cache and unregister the host shuffle map status when the host is decommissioned too.

This PR tries to achieve the same goal without the cache. Instead of saving the `workerLost` in `ExecutorUpdated` / `ExecutorDecommissionInfo` / `ExecutorDecommissionState`, we could save the `hostOpt` directly. When the host is decommissioned or lost too, the `hostOpt` can be a specific host address. Otherwise, it's `None` to indicate that only the executor is decommissioned or lost.

Now that we have the host info, we can also unregister the host shuffle map status when `executorLost` is triggered for the decommissioned executor.

Besides, this PR also includes a few cleanups around the touched code.

### Why are the changes needed?

It helps to unregister the shuffle map status earlier for both decommission and normal executor lost cases.

It also saves memory in  `TaskSchedulerImpl` and simplifies the code a little bit.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This PR only refactor the code. The original behaviour should be covered by `DecommissionWorkerSuite`.

Closes #29579 from Ngone51/impr-decom.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-08 04:40:13 +00:00
yi.wu e6fec33f18 [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
### What changes were proposed in this pull request?

This PR adds support to read host-local shuffle data from disk directly when external shuffle service is disabled.

Similar to #25299, we first try to get local disk directories for the shuffle data, which is located at the same host with the current executor. The only difference is, in #25299, it gets the directories from the external shuffle service while in this PR, it gets the directory from the executors.

To implement the feature, this PR extends the `HostLocalDirManager ` for both `ExternalBlockStoreClient` and `NettyBlockTransferService`. Also, this PR adds `getHostLocalDirs` for `NettyBlockTransferService` as `ExternalBlockStoreClient` does, in order to send the get-dir-request to the corresponding executor. And this PR resued the request message`GetLocalDirsForExecutors` for simple.

### Why are the changes needed?

After SPARK-27651 / #25299, Spark can read host-local shuffle data directly from disk when external shuffle service is enabled. To extend the future, we can also support it when the external shuffle service is disabled.

### Does this PR introduce _any_ user-facing change?

Yes. Before this PR, to use the host-local shuffle reading feature, users should not only enable `spark.shuffle.readHostLocalDisk` but also `spark.shuffle.service.enabled`. After this PR, enable `spark.shuffle.readHostLocalDisk` should be enough, and external shuffle service is no longer a pre-requirement.

### How was this patch tested?

Added test and tested manually.

Closes #28911 from Ngone51/support_node_local_shuffle.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-09-02 13:03:44 -07:00
angerszhu 55ce49ed28 [SPARK-32400][SQL][TEST][FOLLOWUP][TEST-MAVEN] Fix resource loading error in HiveScripTransformationSuite
### What changes were proposed in this pull request?
#29401 move `test_script.py` from sql/hive module to sql/core module, cause HiveScripTransformationSuite load resource issue.

### Why are the changes needed?
This issue cause jenkins test failed in mvn

spark-master-test-maven-hadoop-2.7-hive-2.3-jdk-11: https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7-hive-2.3-jdk-11/
spark-master-test-maven-hadoop-3.2-hive-2.3-jdk-11:
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-3.2-hive-2.3-jdk-11/
spark-master-test-maven-hadoop-3.2-hive-2.3:
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-3.2-hive-2.3/
![image](https://user-images.githubusercontent.com/46485123/91681585-71285a80-eb81-11ea-8519-99fc9783d6b9.png)

![image](https://user-images.githubusercontent.com/46485123/91681010-aaf86180-eb7f-11ea-8dbb-61365a3b0ab4.png)

Error as below:
```
 Exception thrown while executing Spark plan:
 HiveScriptTransformation [a#349299, b#349300, c#349301, d#349302, e#349303], python /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-hive-2.3-jdk-11/sql/hive/file:/home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-hive-2.3-jdk-11/sql/core/target/spark-sql_2.12-3.1.0-SNAPSHOT-tests.jar!/test_script.py, [a#349309, b#349310, c#349311, d#349312, e#349313], ScriptTransformationIOSchema(List(),List(),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),List((field.delim, )),List((field.delim, )),Some(org.apache.hadoop.hive.ql.exec.TextRecordReader),Some(org.apache.hadoop.hive.ql.exec.TextRecordWriter),false)
+- Project [_1#349288 AS a#349299, _2#349289 AS b#349300, _3#349290 AS c#349301, _4#349291 AS d#349302, _5#349292 AS e#349303]
   +- LocalTableScan [_1#349288, _2#349289, _3#349290, _4#349291, _5#349292]

 == Exception ==
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18021.0 failed 1 times, most recent failure: Lost task 0.0 in stage 18021.0 (TID 37324) (192.168.10.31 executor driver): org.apache.spark.SparkException: Subprocess exited with status 2. Error: python: can't open file '/home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-hive-2.3-jdk-11/sql/hive/file:/home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-hive-2.3-jdk-11/sql/core/target/spark-sql_2.12-3.1.0-SNAPSHOT-tests.jar!/test_script.py': [Errno 2] No such file or directory

 at org.apache.spark.sql.execution.BaseScriptTransformationExec.checkFailureAndPropagate(BaseScriptTransformationExec.scala:180)
 at org.apache.spark.sql.execution.BaseScriptTransformationExec.checkFailureAndPropagate$(BaseScriptTransformationExec.scala:157)
 at org.apache.spark.sql.hive.execution.HiveScriptTransformationExec.checkFailureAndPropagate(HiveScriptTransformationExec.scala:49)
 at org.apache.spark.sql.hive.execution.HiveScriptTransformationExec$$anon$1.hasNext(HiveScriptTransformationExec.scala:110)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
 at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
 at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
 at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:127)
 at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:480)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1426)
 at o
```
### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
Existed UT

Closes #29588 from AngersZhuuuu/SPARK-32400-FOLLOWUP.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-02 18:27:29 +09:00
Udbhav30 065f17386d [SPARK-32481][CORE][SQL] Support truncate table to move data to trash
### What changes were proposed in this pull request?
Instead of deleting the data, we can move the data to trash.
Based on the configuration provided by the user it will be deleted permanently from the trash.

### Why are the changes needed?
Instead of directly deleting the data, we can provide flexibility to move data to the trash and then delete it permanently.

### Does this PR introduce _any_ user-facing change?
Yes, After truncate table the data is not permanently deleted now.
It is first moved to the trash and then after the given time deleted permanently;

### How was this patch tested?
new UTs added

Closes #29552 from Udbhav30/truncate.

Authored-by: Udbhav30 <u.agrawal30@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-08-30 10:25:32 -07:00
Devesh Agrawal b786f31a42 [SPARK-32643][CORE][K8S] Consolidate state decommissioning in the TaskSchedulerImpl realm
### What changes were proposed in this pull request?
The decommissioning state is a bit fragment across two places in the TaskSchedulerImpl:

https://github.com/apache/spark/pull/29014/ stored the incoming decommission info messages in TaskSchedulerImpl.executorsPendingDecommission.
While https://github.com/apache/spark/pull/28619/ was storing just the executor end time in the map TaskSetManager.tidToExecutorKillTimeMapping (which in turn is contained in TaskSchedulerImpl).
While the two states are not really overlapping, it's a bit of a code hygiene concern to save this state in two places.

With https://github.com/apache/spark/pull/29422, TaskSchedulerImpl is emerging as the place where all decommissioning book keeping is kept within the driver. So consolidate the information in _tidToExecutorKillTimeMapping_ into _executorsPendingDecommission_.

However, in order to do so, we need to walk away from keeping the raw ExecutorDecommissionInfo messages and instead keep another class ExecutorDecommissionState. This decoupling will allow the RPC message class ExecutorDecommissionInfo to evolve independently from the book keeping ExecutorDecommissionState.

### Why are the changes needed?

This is just a code cleanup. These two features were added independently and its time to consolidate their state for good hygiene.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #29452 from agrawaldevesh/consolidate_decom_state.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
2020-08-26 15:16:47 -07:00
Dongjoon Hyun 2dee4352a0 Revert "[SPARK-32481][CORE][SQL] Support truncate table to move data to trash"
This reverts commit 5c077f0580.
2020-08-26 11:24:35 -07:00
Udbhav30 5c077f0580 [SPARK-32481][CORE][SQL] Support truncate table to move data to trash
### What changes were proposed in this pull request?
Instead of deleting the data, we can move the data to trash.
Based on the configuration provided by the user it will be deleted permanently from the trash.

### Why are the changes needed?
Instead of directly deleting the data, we can provide flexibility to move data to the trash and then delete it permanently.

### Does this PR introduce _any_ user-facing change?
Yes, After truncate table the data is not permanently deleted now.
It is first moved to the trash and then after the given time deleted permanently;

### How was this patch tested?
new UTs added

Closes #29387 from Udbhav30/tuncateTrash.

Authored-by: Udbhav30 <u.agrawal30@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-08-25 23:38:43 -07:00
Thomas Graves 2feab4ef4f [SPARK-32287][TESTS][FOLLOWUP] Add debugging for flaky ExecutorAllocationManagerSuite
### What changes were proposed in this pull request?

fixing flaky test in ExecutorAllocationManagerSuite. The issue is that there is a timing issue when we do a reset as to when the numExecutorsToAddPerResourceProfileId gets reset. The fix is to just always set those values back to 1 when we call reset().

### Why are the changes needed?

fixing flaky test in ExecutorAllocationManagerSuite

### Does this PR introduce _any_ user-facing change?

no
### How was this patch tested?

ran the unit test via this PR a bunch of times and the fix seems to be working.

Closes #29508 from tgravescs/debugExecAllocTest.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-08-25 23:05:35 +09:00
Daniel Moore a3179a78b6 [SPARK-32664][CORE] Switch the log level from info to debug at BlockManager.getLocalBlockData
### What changes were proposed in this pull request?
Changing an info log to a debug log based on SPARK-32664

### Why are the changes needed?
It is outlined in SPARK-32664

### Does this PR introduce _any_ user-facing change?
There are changes to the debug and info logs

### How was this patch tested?
Tested by looking at the logs

Closes #29527 from dmoore62/SPARK-32664.

Authored-by: Daniel Moore <moore@knights.ucf.edu>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-08-25 22:45:44 +09:00
Baohe Zhang 9151a589a7 [SPARK-31608][CORE][WEBUI][TEST] Add test suites for HybridStore and HistoryServerMemoryManager
### What changes were proposed in this pull request?
This pull request adds 2 test suites for 2 new classes HybridStore and HistoryServerMemoryManager, which were created in https://github.com/apache/spark/pull/28412. This pull request also did some minor changes in these 2 classes to expose some variables for testing. Besides 2 suites, this pull request adds a unit test in FsHistoryProviderSuite to test parsing logs with HybridStore.

### Why are the changes needed?
Unit tests are needed for new features.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit tests.

Closes #29509 from baohe-zhang/SPARK-31608-UT.

Authored-by: Baohe Zhang <baohe.zhang@verizonmedia.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
2020-08-25 12:07:49 +09:00
Michael Munday bc23bb7882 [SPARK-32588][CORE][TEST] Fix SizeEstimator initialization in tests
In order to produce consistent results from SizeEstimator the tests
override some system properties that are used during SizeEstimator
initialization. However there were several places where either the
compressed references property wasn't set or the system properties
were set but the SizeEstimator not re-initialized.

This caused failures when running the tests with a large heap build
of OpenJ9 because it does not use compressed references unlike most
environments.

### What changes were proposed in this pull request?
Initialize SizeEstimator class explicitly in the tests where required to avoid relying on a particular environment.

### Why are the changes needed?
Test failures can be seen when compressed references are disabled (e.g. using an OpenJ9 large heap build or Hotspot with a large heap).

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tests run on machine running OpenJ9 large heap build.

Closes #29407 from mundaym/fix-sizeestimator.

Authored-by: Michael Munday <mike.munday@ibm.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-08-24 10:19:30 -05:00
Smith Cruise a4d785dadc [MINOR] Typo in ShuffleMapStage.scala
Simple typo

### What changes were proposed in this pull request?

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #29486 from Smith-Cruise/patch-1.

Authored-by: Smith Cruise <chendingchao1@126.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-08-22 13:26:59 -05:00
yangjie01 25c7d0fe6a [SPARK-32526][SQL] Pass all test of sql/catalyst module in Scala 2.13
### What changes were proposed in this pull request?
The purpose of this pr is to resolve [SPARK-32526](https://issues.apache.org/jira/browse/SPARK-32526), all remaining failed cases are fixed.

The main change of this pr as follow:

- Change of `ExecutorAllocationManager.scala` for core module compilation in Scala 2.13, it's a blocking problem

- Change `Seq[_]` to `scala.collection.Seq[_]` refer to failed cases

- Added different expected plan of `Test 4: Star with several branches` of StarJoinCostBasedReorderSuite  for Scala 2.13 because the candidates plans:

```
Join Inner, (d1_pk#5 = f1_fk1#0)
:- Join Inner, (f1_fk2#1 = d2_pk#8)
:  :- Join Inner, (f1_fk3#2 = d3_pk#11)
```
and

```
Join Inner, (f1_fk2#1 = d2_pk#8)
:- Join Inner, (d1_pk#5 = f1_fk1#0)
:  :- Join Inner, (f1_fk3#2 = d3_pk#11)
```

have same cost `Cost(200,9200)`, but `HashMap` is rewritten in scala 2.13 and The order of iterations leads to different results.

This pr fix test cases as follow:

- LiteralExpressionSuite (1 FAILED -> PASS)
- StarJoinCostBasedReorderSuite ( 1 FAILED-> PASS)
- ObjectExpressionsSuite( 2 FAILED-> PASS)
- ScalaReflectionSuite (1 FAILED-> PASS)
- RowEncoderSuite (10 FAILED-> PASS)
- ExpressionEncoderSuite  (ABORTED-> PASS)

### Why are the changes needed?
We need to support a Scala 2.13 build.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
<!--
- Scala 2.12: Pass the Jenkins or GitHub Action

- Scala 2.13: Do the following:

```
dev/change-scala-version.sh 2.13
mvn clean install -DskipTests  -pl sql/catalyst -Pscala-2.13 -am
mvn test -pl sql/catalyst -Pscala-2.13
```

**Before**
```
Tests: succeeded 4035, failed 17, canceled 0, ignored 6, pending 0
*** 1 SUITE ABORTED ***
*** 15 TESTS FAILED ***
```

**After**

```
Tests: succeeded 4338, failed 0, canceled 0, ignored 6, pending 0
All tests passed.
```

Closes #29434 from LuciferYang/sql-catalyst-tests.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-08-22 09:24:16 -05:00
Brandon Jiang 1450b5e095 [MINOR][DOCS] fix typo for docs,log message and comments
### What changes were proposed in this pull request?
Fix typo for docs, log messages and comments

### Why are the changes needed?
typo fix to increase readability

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
manual test has been performed to test the updated

Closes #29443 from brandonJY/spell-fix-doc.

Authored-by: Brandon Jiang <Brandon.jiang.a@outlook.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-08-22 06:45:35 +09:00
yi.wu 44a288fc41 [SPARK-32653][CORE] Decommissioned host/executor should be considered as inactive in TaskSchedulerImpl
### What changes were proposed in this pull request?

Add decommissioning status checking for a host or executor while checking it's active or not. And a decommissioned host or executor should be considered as inactive.

### Why are the changes needed?

First of all, this PR is not a correctness bug fix but gives improvement indeed. And the main problem here we want to fix is that a decommissioned host or executor should be considered as inactive.

`TaskSetManager.computeValidLocalityLevels` depends on `TaskSchedulerImpl.isExecutorAlive/hasExecutorsAliveOnHost` to calculate the locality levels. Therefore, the `TaskSetManager` could also get corresponding locality levels of those decommissioned hosts or executors if they're not considered as inactive. However, on the other side,  `CoarseGrainedSchedulerBackend` won't construct the `WorkerOffer` for those decommissioned executors. That also means `TaskSetManager` might never have a chance to launch tasks at certain locality levels but only suffers the unnecessary delay because of delay scheduling. So, this PR helps to reduce this kind of unnecessary delay by making decommissioned host/executor inactive in `TaskSchedulerImpl`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit tests

Closes #29468 from Ngone51/fix-decom-alive.

Lead-authored-by: yi.wu <yi.wu@databricks.com>
Co-authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Co-authored-by: wuyi <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-20 12:00:32 +00:00
Xingbo Jiang f793977e9a [SPARK-32658][CORE] Fix PartitionWriterStream partition length overflow
### What changes were proposed in this pull request?

The `count` in `PartitionWriterStream` should be a long value, instead of int. The issue is introduced by apache/sparkabef84a . When the overflow happens, the shuffle index file would record wrong index of a reduceId, thus lead to `FetchFailedException: Stream is corrupted` error.

Besides the fix, I also added some debug logs, so in the future it's easier to debug similar issues.

### Why are the changes needed?

This is a regression and bug fix.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

A Spark user reported this issue when migrating their workload to 3.0. One of the jobs fail deterministically on Spark 3.0 without the patch, and the job succeed after applied the fix.

Closes #29474 from jiangxb1987/fixPartitionWriteStream.

Authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-20 07:08:30 +00:00
yi.wu a1a32d2eb5 [SPARK-32600][CORE] Unify task name in some logs between driver and executor
### What changes were proposed in this pull request?

This PR replaces some arbitrary task names in logs with the widely used task name (e.g. "task 0.0 in stage 1.0 (TID 1)") among driver and executor. This will change the task name in `TaskDescription` by appending TID.

### Why are the changes needed?

Some logs are still using TID(a.k.a `taskId`) only as the task name, e.g.,

7f275ee597/core/src/main/scala/org/apache/spark/executor/Executor.scala (L786)

7f275ee597/core/src/main/scala/org/apache/spark/executor/Executor.scala (L632-L635)

And the task thread name also only has the `taskId`:

7f275ee597/core/src/main/scala/org/apache/spark/executor/Executor.scala (L325)

As mentioned in https://github.com/apache/spark/pull/1259, TID itself does not capture stage or retries, making it harder to correlate with the application. It's inconvenient when debugging applications.

Actually, task name like "task name (e.g. "task 0.0 in stage 1.0 (TID 1)")" has already been used widely after https://github.com/apache/spark/pull/1259. We'd better follow the naming convention.

### Does this PR introduce _any_ user-facing change?

Yes. Users will see the more consistent task names in the log.

### How was this patch tested?

Manually checked.

Closes #29418 from Ngone51/unify-task-name.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-19 08:44:49 +00:00
yi.wu 3092527f75 [SPARK-32651][CORE] Decommission switch configuration should have the highest hierarchy
### What changes were proposed in this pull request?

Rename `spark.worker.decommission.enabled` to `spark.decommission.enabled` and move it from `org.apache.spark.internal.config.Worker` to `org.apache.spark.internal.config.package`.

### Why are the changes needed?

Decommission has been supported in Standalone and k8s yet and may be supported in Yarn(https://github.com/apache/spark/pull/27636) in the future. Therefore, the switch configuration should have the highest hierarchy rather than belongs to Standalone's Worker. In other words, it should be independent of the cluster managers.

### Does this PR introduce _any_ user-facing change?

No, as the decommission feature hasn't been released.

### How was this patch tested?

Pass existed tests.

Closes #29466 from Ngone51/fix-decom-conf.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-19 06:53:06 +00:00