### What changes were proposed in this pull request?
This patch proposes to fix the issue that storage memory is not decreasing even block is removed in BlockManager. Originally the issue is found while removed broadcast doesn't reflect the storage memory on driver/executors.
AppStatusListener expects the value of memory in events on block update as "delta" so that it adjusts driver/executors' storage memory based on delta, but when removing block BlockManager reports the delta as 0, so the storage memory is not decreased. `BlockManager.dropFromMemory` deals with this correctly, so some of path of freeing memory has been updated correctly.
### Why are the changes needed?
The storage memory in metrics in AppStatusListener is now out of sync which lets end users easy to confuse as memory leak is happening.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Modified UTs. Also manually tested via running simple query repeatedly and observe executor page of Spark UI to see the value of storage memory is decreasing as well.
Please refer the description of [SPARK-29055](https://issues.apache.org/jira/browse/SPARK-29055) to get simple reproducer.
Closes#25973 from HeartSaVioR/SPARK-29055.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
### What changes were proposed in this pull request?
This PR aims to remove `scalatest` deprecation warnings with the following changes.
- `org.scalatest.mockito.MockitoSugar` -> `org.scalatestplus.mockito.MockitoSugar`
- `org.scalatest.selenium.WebBrowser` -> `org.scalatestplus.selenium.WebBrowser`
- `org.scalatest.prop.Checkers` -> `org.scalatestplus.scalacheck.Checkers`
- `org.scalatest.prop.GeneratorDrivenPropertyChecks` -> `org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks`
### Why are the changes needed?
According to the Jenkins logs, there are 118 warnings about this.
```
grep "is deprecated" ~/consoleText | grep scalatest | wc -l
118
```
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
After Jenkins passes, we need to check the Jenkins log.
Closes#25982 from dongjoon-hyun/SPARK-29307.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Scala 2.13 emits a deprecation warning for procedure-like declarations:
```
def foo() {
...
```
This is equivalent to the following, so should be changed to avoid a warning:
```
def foo(): Unit = {
...
```
### Why are the changes needed?
It will avoid about a thousand compiler warnings when we start to support Scala 2.13. I wanted to make the change in 3.0 as there are less likely to be back-ports from 3.0 to 2.4 than 3.1 to 3.0, for example, minimizing that downside to touching so many files.
Unfortunately, that makes this quite a big change.
### Does this PR introduce any user-facing change?
No behavior change at all.
### How was this patch tested?
Existing tests.
Closes#25968 from srowen/SPARK-29291.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR regenerate the benchmark results in `core` and `mllib` module in order to compare JDK8/JDK11 result.
### Why are the changes needed?
According to the result, For `PropertiesCloneBenchmark` and `UDTSerializationBenchmark`, JDK11 is slightly faster. In general, there is no regression in JDK11.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
This is a test-only PR. Manually run the benchmark.
Closes#25969 from dongjoon-hyun/SPARK-29297.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Some of the columns of JDBC/ODBC server tab in Web UI are hard to understand.
We have documented it at SPARK-28373 but I think it is better to have some tooltips in the SQL statistics table to explain the columns
![image](https://user-images.githubusercontent.com/12819544/64489775-38e48980-d257-11e9-868a-5f5f6a0f1e46.png)
The columns with new tooltips are finish time, close time, execution time and duration
![image](https://user-images.githubusercontent.com/12819544/64489858-1141f100-d258-11e9-9e4e-fae3299da465.png)
Improvements in UIUtils can be used in other tables in WebUI to add tooltips
### Why are the changes needed?
It is interesting to improve the undestanding of the WebUI
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit tests are added and manual test.
Closes#25723 from planga82/feature/SPARK-29019_tooltipjdbcServer.
Lead-authored-by: Unknown <soypab@gmail.com>
Co-authored-by: Pablo <soypab@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
This PR aims to specify the JDK8 default configurations `-XX:+UseParallelGC -XX:-UseDynamicNumberOfGCThreads` explicitly. As we see in this PR [here](https://github.com/apache/spark/pull/25966/files#diff-12b89b7ee67c63c2254b749c8f8d0694R10), this will make the comparison between JDK8 and JDK11 easier by removing a misleading regression.
**NOTE THAT THESE JVM CONFS ARE ONLY FOR BENCHMARK COMPARISON, NOT FOR A PRODUCTION**
### Why are the changes needed?
There exists many JVM-level changes between JDK8 and JDK11. For example, the followings are notable changes and it turns out that especially (1) and (2) shows a misleading regression in our micro-benchmark environment because our microbenchmark uses small VM memory.
1. [JEP 248: Make G1 the Default Garbage Collector](https://bugs.openjdk.java.net/browse/JDK-8073273) **JDK9+**
2. [Enable UseDynamicNumberOfGCThreads by default](https://bugs.openjdk.java.net/browse/JDK-8198547) **JDK11+**
3. [Change default value of HeapSizePerGCThread](https://bugs.openjdk.java.net/browse/JDK-8200417) **JDK11+**
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
This is a test-only JVM configuration change. Manually, run the benchmark.
Closes#25966 from dongjoon-hyun/SPARK-29282.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
availableSlots are computed before the for loop looping over all TaskSets in resourceOffers. But the number of slots changes in every iteration, as in every iteration these slots are taken. The number of available slots checked by a barrier task set has therefore to be recomputed in every iteration from availableCpus.
### Why are the changes needed?
Bugfix.
This could make resourceOffer attempt to start a barrier task set, even though it has not enough slots available. That would then be caught by the `require` in line 519, which will throw an exception, which will get caught and ignored by Dispatcher's MessageLoop, so nothing terrible would happen, but the exception would prevent resourceOffers from considering further TaskSets.
Note that launching the barrier TaskSet can still fail if other requirements are not satisfied, and still can be rolled-back by throwing exception in this `require`. Handling it more gracefully remains a TODO in SPARK-24818, but this fix at least should resolve the situation when it's unable to launch because of insufficient slots.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Added UT
Closes#23375Closes#25946 from juliuszsompolski/SPARK-29263.
Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
### What changes were proposed in this pull request?
Remove unnecessary imports in `core` module.
### Why are the changes needed?
Clean code for Apache Spark 3.0.0.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Local test.
Closes#25927 from sev7e0/dev_0925.
Authored-by: sev7e0 <sev7e0@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Protected the `executorDataMap` under lock when accessing it out of 'DriverEndpoint''s methods.
### Why are the changes needed?
Just as the comments:
>
// Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any
// protection. But accessing `executorDataMap` out of `DriverEndpoint.receive/receiveAndReply`
// must be protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should
// only be modified in `DriverEndpoint.receive/receiveAndReply` with protection by
// `CoarseGrainedSchedulerBackend.this`.
`executorDataMap` is not threadsafe, it should be protected by lock when accessing it out of `DriverEndpoint`
### Does this PR introduce any user-facing change?
NO
### How was this patch tested?
Existed UT.
Closes#25922 from ConeyLiu/executorDataMap.
Authored-by: Xianyang Liu <xianyang.liu@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Copy any "spark.hive.foo=bar" spark properties into hadoop conf as "hive.foo=bar"
### Why are the changes needed?
Providing spark side config entry for hive configurations.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
UT.
Closes#25661 from WeichenXu123/add_hive_conf.
Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
When I use `ProcfsMetricsGetterSuite for` testing, always throw out `java.lang.NullPointerException`. I think there is a problem with locating `new ProcfsMetricsGetter`, which will lead to `SparkEnv` not being initialized in time. This leads to `java.lang.NullPointerException` when the method is executed.
### Why are the changes needed?
For test.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Local testing
Closes#25918 from sev7e0/dev_0924.
Authored-by: sev7e0 <sev7e0@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This PR is an enhanced version of https://github.com/apache/spark/pull/25805 so I've kept the original text. The problem with the original PR can be found in comment.
This situation can happen when an external system (e.g. Oozie) generates
delegation tokens for a Spark application. The Spark driver will then run
against secured services, have proper credentials (the tokens), but no
kerberos credentials. So trying to do things that requires a kerberos
credential fails.
Instead, if no kerberos credentials are detected, just skip the whole
delegation token code.
Tested with an application that simulates Oozie; fails before the fix,
passes with the fix. Also with other DT-related tests to make sure other
functionality keeps working.
Closes#25901 from gaborgsomogyi/SPARK-29082.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Modified widths of some checkboxes in StagePage.
### Why are the changes needed?
When we increase the font size of the browsers or the default font size is big, the labels of checkbox of `Show Additional Metrics` in `StagePage` are wrapped like as follows.
![before-modified1](https://user-images.githubusercontent.com/4736016/65449180-634c5e80-de75-11e9-9f27-88f4cc1313b7.png)
![before-modified2](https://user-images.githubusercontent.com/4736016/65449182-63e4f500-de75-11e9-96b8-46e92a61f40c.png)
### Does this PR introduce any user-facing change?
Yes.
### How was this patch tested?
Run the following and visit the `Stage Detail` page. Then, increase the font size.
```
$ bin/spark-shell
...
scala> spark.range(100000).groupBy("id").count.collect
```
Closes#25905 from sarutak/adjust-checkbox-width.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
BarrierCoordinator sets up a TimerTask for a round of global sync. Currently the run method is synchronized on the created TimerTask. But to be synchronized with handleRequest, it should be synchronized on the ContextBarrierState object, not TimerTask object.
### Why are the changes needed?
ContextBarrierState.handleRequest and TimerTask.run both access the internal status of a ContextBarrierState object. If TimerTask doesn't be synchronized on the same ContextBarrierState object, when the timer task is triggered, handleRequest still accepts new request and modify requesters field in the ContextBarrierState object. It makes the behavior inconsistency.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Test locally
Closes#25897 from viirya/SPARK-25903.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Do task handling even the task exceeds maxResultSize configured. More details are in the jira description https://issues.apache.org/jira/browse/SPARK-29177 .
### Why are the changes needed?
Without this patch, the zombie tasks will prevent yarn from recycle those containers running these tasks, which will affect other applications.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
unit test and production test with a very large `SELECT` in spark thriftserver.
Closes#25850 from adrian-wang/zombie.
Authored-by: Daoyuan Wang <me@daoyuan.wang>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
After the newly added shuffle block fetching protocol in #24565, we can keep this work by extending the FetchShuffleBlocks message.
### What changes were proposed in this pull request?
In this patch, we achieve the indeterminate shuffle rerun by reusing the task attempt id(unique id within an application) in shuffle id, so that each shuffle write attempt has a different file name. For the indeterministic stage, when the stage resubmits, we'll clear all existing map status and rerun all partitions.
All changes are summarized as follows:
- Change the mapId to mapTaskAttemptId in shuffle related id.
- Record the mapTaskAttemptId in MapStatus.
- Still keep mapId in ShuffleFetcherIterator for fetch failed scenario.
- Add the determinate flag in Stage and use it in DAGScheduler and the cleaning work for the intermediate stage.
### Why are the changes needed?
This is a follow-up work for #22112's future improvment[1]: `Currently we can't rollback and rerun a shuffle map stage, and just fail.`
Spark will rerun a finished shuffle write stage while meeting fetch failures, currently, the rerun shuffle map stage will only resubmit the task for missing partitions and reuse the output of other partitions. This logic is fine in most scenarios, but for indeterministic operations(like repartition), multiple shuffle write attempts may write different data, only rerun the missing partition will lead a correctness bug. So for the shuffle map stage of indeterministic operations, we need to support rolling back the shuffle map stage and re-generate the shuffle files.
### Does this PR introduce any user-facing change?
Yes, after this PR, the indeterminate stage rerun will be accepted by rerunning the whole stage. The original behavior is aborting the stage and fail the job.
### How was this patch tested?
- UT: Add UT for all changing code and newly added function.
- Manual Test: Also providing a manual test to verify the effect.
```
import scala.sys.process._
import org.apache.spark.TaskContext
val determinateStage0 = sc.parallelize(0 until 1000 * 1000 * 100, 10)
val indeterminateStage1 = determinateStage0.repartition(200)
val indeterminateStage2 = indeterminateStage1.repartition(200)
val indeterminateStage3 = indeterminateStage2.repartition(100)
val indeterminateStage4 = indeterminateStage3.repartition(300)
val fetchFailIndeterminateStage4 = indeterminateStage4.map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId == 190 &&
TaskContext.get.stageAttemptNumber == 0) {
throw new Exception("pkill -f -n java".!!)
}
x
}
val indeterminateStage5 = fetchFailIndeterminateStage4.repartition(200)
val finalStage6 = indeterminateStage5.repartition(100).collect().distinct.length
```
It's a simple job with multi indeterminate stage, it will get a wrong answer while using old Spark version like 2.2/2.3, and will be killed after #22112. With this fix, the job can retry all indeterminate stage as below screenshot and get the right result.
![image](https://user-images.githubusercontent.com/4833765/63948434-3477de00-caab-11e9-9ed1-75abfe6d16bd.png)
Closes#25620 from xuanyuanking/SPARK-25341-8.27.
Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Correct a word in a log message.
### Why are the changes needed?
Log message will be more clearly.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Test is not needed.
Closes#25880 from mdianjun/fix-a-word.
Authored-by: madianjun <madianjun@jd.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Credit to vanzin as he found and commented on this while reviewing #25670 - [comment](https://github.com/apache/spark/pull/25670#discussion_r325383512).
This patch proposes to specify UTF-8 explicitly while reading/writer event log file.
### Why are the changes needed?
The event log file is being read/written as default character set of JVM process which may open the chance to bring some problems on reading event log files from another machines. Spark's de facto standard character set is UTF-8, so it should be explicitly set to.
### Does this PR introduce any user-facing change?
Yes, if end users have been running Spark process with different default charset than "UTF-8", especially their driver JVM processes. No otherwise.
### How was this patch tested?
Existing UTs, as ReplayListenerSuite contains "end-to-end" event logging/reading tests (both uncompressed/compressed).
Closes#25845 from HeartSaVioR/SPARK-29160.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
TransportClientFactory.createClient() is called by task and TransportClientFactory.close() is called by executor.
When stop the executor, close() will set workerGroup = null, NPE will occur in createClient which generate many exception in log.
For exception occurs after close(), treated it as an expected Exception
and transform it to InterruptedException which can be processed by Executor.
### Why are the changes needed?
The change can reduce the exception stack trace in log file, and user won't be confused by these excepted exception.
### Does this PR introduce any user-facing change?
N/A
### How was this patch tested?
New tests are added in TransportClientFactorySuite and ExecutorSuite
Closes#25759 from colinmjj/spark-19147.
Authored-by: colinma <colinma@tencent.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
This PR aims to extend the existing benchmarks to save JDK9+ result separately.
All `core` module benchmark test results are added. I'll run the other test suites in another PR.
After regenerating all results, we will check JDK11 performance regressions.
### Why are the changes needed?
From Apache Spark 3.0, we support both JDK8 and JDK11. We need to have a way to find the performance regression.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Manually run the benchmark.
Closes#25873 from dongjoon-hyun/SPARK-JDK11-PERF.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR allows Python toLocalIterator to prefetch the next partition while the first partition is being collected. The PR also adds a demo micro bench mark in the examples directory, we may wish to keep this or not.
### Why are the changes needed?
In https://issues.apache.org/jira/browse/SPARK-23961 / 5e79ae3b40 we changed PySpark to only pull one partition at a time. This is memory efficient, but if partitions take time to compute this can mean we're spending more time blocking.
### Does this PR introduce any user-facing change?
A new param is added to toLocalIterator
### How was this patch tested?
New unit test inside of `test_rdd.py` checks the time that the elements are evaluated at. Another test that the results remain the same are added to `test_dataframe.py`.
I also ran a micro benchmark in the examples directory `prefetch.py` which shows an improvement of ~40% in this specific use case.
>
> 19/08/16 17:11:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
> Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
> Running timers:
>
> [Stage 32:> (0 + 1) / 1]
> Results:
>
> Prefetch time:
>
> 100.228110831
>
>
> Regular time:
>
> 188.341721614
>
>
>
Closes#25515 from holdenk/SPARK-27659-allow-pyspark-tolocalitr-to-prefetch.
Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
### What changes were proposed in this pull request?
This patch proposes to increase timeout to wait for executor(s) to be up in SparkContextSuite, as we observed these tests failed due to wait timeout.
### Why are the changes needed?
There's some case that CI build is extremely slow which requires 3x or more time to pass the test.
(https://issues.apache.org/jira/browse/SPARK-29139?focusedCommentId=16934034&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16934034)
Allocating higher timeout wouldn't bring additional latency, as the code checks the condition with sleeping 10 ms per loop iteration.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
N/A, as the case is not likely to be occurred frequently.
Closes#25864 from HeartSaVioR/SPARK-29139.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR is a followup of https://github.com/apache/spark/pull/25838 and proposes to create an actual test case under `src/test`. Previously, compile only test existed at `src/main`.
Also, just changed the wordings in `SerializableConfiguration` just only to describe what it does (remove other words).
### Why are the changes needed?
Tests codes should better exist in `src/test` not `src/main`. Also, it should better test a basic functionality.
### Does this PR introduce any user-facing change?
No except minor doc change.
### How was this patch tested?
Unit test was added.
Closes#25867 from HyukjinKwon/SPARK-29158.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This is a follow-up of the [review comment](https://github.com/apache/spark/pull/25706#discussion_r321923311).
This patch unifies the default wait time to be 10 seconds as it would fit most of UTs (as they have smaller timeouts) and doesn't bring additional latency since it will return if the condition is met.
This patch doesn't touch the one which waits 100000 milliseconds (100 seconds), to not break anything unintentionally, though I'd rather questionable that we really need to wait for 100 seconds.
### Why are the changes needed?
It simplifies the test code and get rid of various heuristic values on timeout.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
CI build will test the patch, as it would be the best environment to test the patch (builds are running there).
Closes#25837 from HeartSaVioR/MINOR-unify-default-wait-time-for-wait-until-empty.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Currently the SerializableConfiguration, which makes the Hadoop configuration serializable is private. This makes it public, with a developer annotation.
### Why are the changes needed?
Many data source depend on the Hadoop configuration which may have specific components on the driver. Inside of Spark's own DataSourceV2 implementations this is frequently used (Parquet, Json, Orc, etc.)
### Does this PR introduce any user-facing change?
This provides a new developer API.
### How was this patch tested?
No new tests are added as this only exposes a previously developed & thoroughly used + tested component.
Closes#25838 from holdenk/SPARK-29158-expose-serializableconfiguration-for-dsv2.
Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This addresses about 15 miscellaneous warnings that appear in the current build.
### Why are the changes needed?
No functional changes, it just slightly reduces the amount of extra warning output.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing tests, run manually.
Closes#25852 from srowen/BuildWarnings.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This situation can happen when an external system (e.g. Oozie) generates
delegation tokens for a Spark application. The Spark driver will then run
against secured services, have proper credentials (the tokens), but no
kerberos credentials. So trying to do things that requires a kerberos
credential fails.
Instead, if no kerberos credentials are detected, just skip the whole
delegation token code.
Tested with an application that simulates Oozie; fails before the fix,
passes with the fix. Also with other DT-related tests to make sure other
functionality keeps working.
Closes#25805 from vanzin/SPARK-29082.
Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
This proposes to improve Spark instrumentation by adding a hook for user-defined metrics, extending Spark’s Dropwizard/Codahale metrics system.
The original motivation of this work was to add instrumentation for S3 filesystem access metrics by Spark job. Currently, [[ExecutorSource]] instruments HDFS and local filesystem metrics. Rather than extending the code there, we proposes with this JIRA to add a metrics plugin system which is of more flexible and general use.
Context: The Spark metrics system provides a large variety of metrics, see also , useful to monitor and troubleshoot Spark workloads. A typical workflow is to sink the metrics to a storage system and build dashboards on top of that.
Highlights:
- The metric plugin system makes it easy to implement instrumentation for S3 access by Spark jobs.
- The metrics plugin system allows for easy extensions of how Spark collects HDFS-related workload metrics. This is currently done using the Hadoop Filesystem GetAllStatistics method, which is deprecated in recent versions of Hadoop. Recent versions of Hadoop Filesystem recommend using method GetGlobalStorageStatistics, which also provides several additional metrics. GetGlobalStorageStatistics is not available in Hadoop 2.7 (had been introduced in Hadoop 2.8). Using a metric plugin for Spark would allow an easy way to “opt in” using such new API calls for those deploying suitable Hadoop versions.
- We also have the use case of adding Hadoop filesystem monitoring for a custom Hadoop compliant filesystem in use in our organization (EOS using the XRootD protocol). The metrics plugin infrastructure makes this easy to do. Others may have similar use cases.
- More generally, this method makes it straightforward to plug in Filesystem and other metrics to the Spark monitoring system. Future work on plugin implementation can address extending monitoring to measure usage of external resources (OS, filesystem, network, accelerator cards, etc), that maybe would not normally be considered general enough for inclusion in Apache Spark code, but that can be nevertheless useful for specialized use cases, tests or troubleshooting.
Implementation:
The proposed implementation extends and modifies the work on Executor Plugin of SPARK-24918. Additionally, this is related to recent work on extending Spark executor metrics, such as SPARK-25228.
As discussed during the review, the implementaiton of this feature modifies the Developer API for Executor Plugins, such that the new version is incompatible with the original version in Spark 2.4.
## How was this patch tested?
This modifies existing tests for ExecutorPluginSuite to adapt them to the API changes. In addition, the new funtionality for registering pluginMetrics has been manually tested running Spark on YARN and K8S clusters, in particular for monitoring S3 and for extending HDFS instrumentation with the Hadoop Filesystem “GetGlobalStorageStatistics” metrics. Executor metric plugin example and code used for testing are available, for example at: https://github.com/cerndb/SparkExecutorPluginsCloses#24901 from LucaCanali/executorMetricsPlugin.
Authored-by: Luca Canali <luca.canali@cern.ch>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
HDFS doesn't update the file size reported by the NM if you just keep
writing to the file; this makes the SHS believe the file is inactive,
and so it may delete it after the configured max age for log files.
This change uses hsync to keep the log file as up to date as possible
when using HDFS. It also disables erasure coding by default for these
logs, since hsync (& friends) does not work with EC.
Tested with a SHS configured to aggressively clean up logs; verified
a spark-shell session kept updating the log, which was not deleted by
the SHS.
Closes#25819 from vanzin/SPARK-29105.
Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
### What changes were proposed in this pull request?
Updating unit description in configurations, inorder to maintain consistency across configurations.
### Why are the changes needed?
the description does not mention about suffix that can be mentioned while configuring this value.
For better user understanding
### Does this PR introduce any user-facing change?
yes. Doc description
### How was this patch tested?
generated document and checked.
![Screenshot from 2019-09-05 11-09-17](https://user-images.githubusercontent.com/51401130/64314853-07a55880-cfce-11e9-8af0-6416a50b0188.png)
Closes#25689 from PavithraRamachandran/heapsize_config.
Authored-by: Pavithra Ramachandran <pavi.rams@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
In this PR, I fix some annotation errors and remove meaningless annotations in project.
### Why are the changes needed?
There are some annotation errors and meaningless annotations in project.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Verified manually.
Closes#25809 from turboFei/SPARK-29113.
Authored-by: turbofei <fwang12@ebay.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR changes `bytesHash(data)` API invocation with the underlaying `byteHash(data, arraySeed)` invocation.
```scala
def bytesHash(data: Array[Byte]): Int = bytesHash(data, arraySeed)
```
### Why are the changes needed?
The original API is changed between Scala versions by the following commit. From Scala 2.12.9, the semantic of the function is changed. If we use the underlying form, we are safe during Scala version migration.
- 846ee2b1a4 (diff-ac889f851e109fc4387cd738d52ce177)
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
This is a kind of refactoring.
Pass the Jenkins with the existing tests.
Closes#25821 from dongjoon-hyun/SPARK-SCALA-HASH.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
`PipedRDD` will invoke `stdinWriterThread.interrupt()` at task completion, and `obj.wait` will get `InterruptedException`. However, there exists a possibility which the thread termination gets delayed because the thread starts from `obj.wait()` with that exception. To prevent test flakiness, we need to use `eventually`. Also, This PR fixes the typo in code comment and variable name.
### Why are the changes needed?
```
- stdin writer thread should be exited when task is finished *** FAILED ***
Some(Thread[stdin writer for List(cat),5,]) was not empty (PipedRDDSuite.scala:107)
```
- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/6867/testReport/junit/org.apache.spark.rdd/PipedRDDSuite/stdin_writer_thread_should_be_exited_when_task_is_finished/
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Manual.
We can reproduce the same failure like Jenkins if we catch `InterruptedException` and sleep longer than the `eventually` timeout inside the test code. The following is the example to reproduce it.
```scala
val nums = sc.makeRDD(Array(1, 2, 3, 4), 1).map { x =>
try {
obj.synchronized {
obj.wait() // make the thread waits here.
}
} catch {
case ie: InterruptedException =>
Thread.sleep(15000)
throw ie
}
x
}
```
Closes#25808 from dongjoon-hyun/SPARK-29104.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
I fix the test "barrier task killed" which is flaky:
* Split interrupt/no interrupt test into separate sparkContext. Prevent them to influence each other.
* only check exception on partiton-0. partition-1 is hang on sleep which may throw other exception.
### Why are the changes needed?
Make test robust.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
N/A
Closes#25799 from WeichenXu123/oss_fix_barrier_test.
Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: WeichenXu <weichen.xu@databricks.com>
### What changes were proposed in this pull request?
Log levels in Executor.scala are changed from DEBUG to INFO.
### Why are the changes needed?
Logging level DEBUG is too low here. These messages are simple acknowledgement for successful loading and initialization of plugins. So its better to keep them in INFO level.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Manually tested.
Closes#25634 from iRakson/ExecutorPlugin.
Authored-by: iRakson <raksonrakesh@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
The previous refactors of the shuffle writers using the shuffle writer plugin resulted in shuffle write metric updates - particularly write times - being lost in particular situations. This patch restores the lost metric updates.
### Why are the changes needed?
This fixes a regression. I'm pretty sure that without this, the Spark UI will lose shuffle write time information.
### Does this PR introduce any user-facing change?
No change from Spark 2.4. Without this, there would be a user-facing bug in Spark 3.0.
### How was this patch tested?
Existing unit tests.
Closes#25780 from mccheah/fix-write-metrics.
Authored-by: mcheah <mcheah@palantir.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
### What changes were proposed in this pull request?
[SPARK-27122](https://github.com/apache/spark/pull/24088) fixes `ClassCastException` at `yarn` module by introducing `DelegatingServletContextHandler`. Initially, this was discovered with JDK9+, but the class path issues affected JDK8 environment, too. After [SPARK-28709](https://github.com/apache/spark/pull/25439), I also hit the similar issue at `streaming` module.
This PR aims to fix `streaming` module by adding `getContextPath` to `DelegatingServletContextHandler` and using it.
### Why are the changes needed?
Currently, when we test `streaming` module independently, it fails like the following.
```
$ build/mvn test -pl streaming
...
UISeleniumSuite:
- attaching and detaching a Streaming tab *** FAILED ***
java.lang.ClassCastException: org.sparkproject.jetty.servlet.ServletContextHandler cannot be cast to org.eclipse.jetty.servlet.ServletContextHandler
...
Tests: succeeded 337, failed 1, canceled 0, ignored 1, pending 0
*** 1 TEST FAILED ***
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
```
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Pass the Jenkins with the modified tests. And do the following manually.
Since you can observe this when you run `streaming` module test only (instead of running all), you need to install the changed `core` module and use it.
```
$ java -version
openjdk version "1.8.0_222"
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_222-b10)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.222-b10, mixed mode)
$ build/mvn install -DskipTests
$ build/mvn test -pl streaming
```
Closes#25791 from dongjoon-hyun/SPARK-29087.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Replace use of `SerializationUtils.clone` with new `Utils.cloneProperties` method
Add benchmark + results showing dramatic speed up for effectively equivalent functionality.
### What changes were proposed in this pull request?
While I am not sure that SerializationUtils.clone is a performance issue in production, I am sure that it is overkill for the task it is doing (providing a distinct copy of a `Properties` object).
This PR provides a benchmark showing the dramatic improvement over the clone operation and replaces uses of `SerializationUtils.clone` on `Properties` with the more specialized `Utils.cloneProperties`.
### Does this PR introduce any user-facing change?
Strings are immutable so there is no reason to serialize and deserialize them, it just creates extra garbage.
The only functionality that would be changed is the unsupported insertion of non-String objects into the spark local properties.
### How was this patch tested?
1. Pass the Jenkins with the existing tests.
2. Since this is a performance improvement PR, manually run the benchmark.
Closes#25787 from databricks-david-lewis/SPARK-29081.
Authored-by: David Lewis <david.lewis@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Make r file extension check case insensitive for spark-submit.
### Why are the changes needed?
spark-submit does not accept `.r` files as R scripts. Some codebases have r files that end with lowercase file extensions. It is inconvenient to use spark-submit with lowercase extension R files. The error is not very clear (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala#L232).
```
$ ./bin/spark-submit examples/src/main/r/dataframe.r
Exception in thread "main" org.apache.spark.SparkException: Cannot load main class from JAR file:/Users/dongjoon/APACHE/spark-release/spark-2.4.4-bin-hadoop2.7/examples/src/main/r/dataframe.r
```
### Does this PR introduce any user-facing change?
Yes. spark-submit can now be used to run R scripts with `.r` file extension.
### How was this patch tested?
Manual.
```
$ mv examples/src/main/r/dataframe.R examples/src/main/r/dataframe.r
$ ./bin/spark-submit examples/src/main/r/dataframe.r
```
Closes#25778 from Loquats/r-case.
Authored-by: Andy Zhang <yue.zhang@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
During Spark History Server startup, there are two things happening simultaneously that call into `java.nio.file.FileSystems.getDefault()` and we sometime hit [JDK-8194653](https://bugs.openjdk.java.net/browse/JDK-8194653).
1) start jetty server
2) start ApplicationHistoryProvider (which reads files from HDFS)
We should do these two things sequentially instead of in parallel.
We introduce a start() method in ApplicationHistoryProvider (and its subclass FsHistoryProvider), and we do initialize inside the start() method instead of the constructor.
In HistoryServer, we explicitly call provider.start() after we call bind() which starts the Jetty server.
### Why are the changes needed?
It is a bug that occasionally starting Spark History Server results in process hang due to deadlock among threads.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
I stress tested this PR with a bash script to stop and start Spark History Server more than 1000 times, it worked fine. Previously I can only do the stop/start loop less than 10 times before I hit the deadlock issue.
Closes#25705 from shanyu/shanyu-29003.
Authored-by: Shanyu Zhao <shzhao@microsoft.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
We already have found and fixed the correctness issue before when RDD output is INDETERMINATE. One missing part is sampling-based RDD. This kind of RDDs is order sensitive to its input. A sampling-based RDD with unordered input, should be INDETERMINATE.
### Why are the changes needed?
A sampling-based RDD with unordered input is just like MapPartitionsRDD with isOrderSensitive parameter as true. The RDD output can be different after a rerun.
It is a problem in ML applications.
In ML, sample is used to prepare training data. ML algorithm fits the model based on the sampled data. If rerun tasks of sample produce different output during model fitting, ML results will be unreliable and also buggy.
Each sample is random output, but once you sampled, the output should be determinate.
### Does this PR introduce any user-facing change?
Previously, a sampling-based RDD can possibly come with different output after a rerun.
After this patch, sampling-based RDD is INDETERMINATE. For an INDETERMINATE map stage, currently Spark scheduler will re-try all the tasks of the failed stage.
### How was this patch tested?
Added test.
Closes#25751 from viirya/sample-order-sensitive.
Authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: Liang-Chi Hsieh <liangchi@uber.com>
JIRA :https://issues.apache.org/jira/browse/SPARK-29050
'a hdfs' change into 'an hdfs'
'an unique' change into 'a unique'
'an url' change into 'a url'
'a error' change into 'an error'
Closes#25756 from dengziming/feature_fix_typos.
Authored-by: dengziming <dengziming@growingio.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This patch ensures accessing recorded values in listener is always after letting listeners fully process all events. To ensure this, this patch adds new class to hide these values and access with methods which will ensure above condition. Without this guard, two threads are running concurrently - 1) listeners process thread 2) test main thread - and race condition would occur.
That's why we also see very odd thing, error message saying condition is met but test failed:
```
- Barrier task failures from the same stage attempt don't trigger multiple stage retries *** FAILED ***
ArrayBuffer(0) did not equal List(0) (DAGSchedulerSuite.scala:2656)
```
which means verification failed, and condition is met just before constructing error message.
The guard is properly placed in many spots, but missed in some places. This patch enforces that it can't be missed.
### Why are the changes needed?
UT fails intermittently and this patch will address the flakyness.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Modified UT.
Also made the flaky tests artificially failing via applying 50ms of sleep on each onXXX method.
![Screen Shot 2019-09-07 at 7 44 15 AM](https://user-images.githubusercontent.com/1317309/64465178-1747ad00-d146-11e9-92f6-f4ed4a1f4b08.png)
I found 3 methods being failed. (They've marked as X. Just ignore ! as they failed on waiting listener in given timeout and these tests don't deal with these recorded values - it uses other timeout value 1000ms than 10000ms for this listener so affected via side-effect.)
When I applied same in this patch all tests marked as X passed.
Closes#25706 from HeartSaVioR/SPARK-26989.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
### What changes were proposed in this pull request?
PR #22112 fixed the todo added by PR #20393(SPARK-23207). We can remove it now.
### Why are the changes needed?
In order not to confuse developers.
### Does this PR introduce any user-facing change?
no
### How was this patch tested?
no need to test
Closes#25755 from LinhongLiu/remove-todo.
Authored-by: Liu,Linhong <liulinhong@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Uses the APIs introduced in SPARK-28209 in the UnsafeShuffleWriter.
## How was this patch tested?
Since this is just a refactor, existing unit tests should cover the relevant code paths. Micro-benchmarks from the original fork where this code was built show no degradation in performance.
Closes#25304 from mccheah/shuffle-writer-refactor-unsafe-writer.
Lead-authored-by: mcheah <mcheah@palantir.com>
Co-authored-by: mccheah <mcheah@palantir.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
Running spark on yarn, I got
```
java.lang.ClassCastException: org.apache.hadoop.ipc.CallerContext$Builder cannot be cast to scala.runtime.Nothing$
```
Utils.classForName return Class[Nothing], I think it should be defind as Class[_] to resolve this issue
## How was this patch tested?
not need
Closes#25389 from hddong/SPARK-28657-fix-currentContext-Instance-failed.
Lead-authored-by: hongdd <jn_hdd@163.com>
Co-authored-by: hongdongdong <hongdongdong@cmss.chinamobile.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
- Remove SQLContext.createExternalTable and Catalog.createExternalTable, deprecated in favor of createTable since 2.2.0, plus tests of deprecated methods
- Remove HiveContext, deprecated in 2.0.0, in favor of `SparkSession.builder.enableHiveSupport`
- Remove deprecated KinesisUtils.createStream methods, plus tests of deprecated methods, deprecate in 2.2.0
- Remove deprecated MLlib (not Spark ML) linear method support, mostly utility constructors and 'train' methods, and associated docs. This includes methods in LinearRegression, LogisticRegression, Lasso, RidgeRegression. These have been deprecated since 2.0.0
- Remove deprecated Pyspark MLlib linear method support, including LogisticRegressionWithSGD, LinearRegressionWithSGD, LassoWithSGD
- Remove 'runs' argument in KMeans.train() method, which has been a no-op since 2.0.0
- Remove deprecated ChiSqSelector isSorted protected method
- Remove deprecated 'yarn-cluster' and 'yarn-client' master argument in favor of 'yarn' and deploy mode 'cluster', etc
Notes:
- I was not able to remove deprecated DataFrameReader.json(RDD) in favor of DataFrameReader.json(Dataset); the former was deprecated in 2.2.0, but, it is still needed to support Pyspark's .json() method, which can't use a Dataset.
- Looks like SQLContext.createExternalTable was not actually deprecated in Pyspark, but, almost certainly was meant to be? Catalog.createExternalTable was.
- I afterwards noted that the toDegrees, toRadians functions were almost removed fully in SPARK-25908, but Felix suggested keeping just the R version as they hadn't been technically deprecated. I'd like to revisit that. Do we really want the inconsistency? I'm not against reverting it again, but then that implies leaving SQLContext.createExternalTable just in Pyspark too, which seems weird.
- I *kept* LogisticRegressionWithSGD, LinearRegressionWithSGD, LassoWithSGD, RidgeRegressionWithSGD in Pyspark, though deprecated, as it is hard to remove them (still used by StreamingLogisticRegressionWithSGD?) and they are not fully removed in Scala. Maybe should not have been deprecated.
### Why are the changes needed?
Deprecated items are easiest to remove in a major release, so we should do so as much as possible for Spark 3. This does not target items deprecated 'recently' as of Spark 2.3, which is still 18 months old.
### Does this PR introduce any user-facing change?
Yes, in that deprecated items are removed from some public APIs.
### How was this patch tested?
Existing tests.
Closes#25684 from srowen/SPARK-28980.
Lead-authored-by: Sean Owen <sean.owen@databricks.com>
Co-authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
If a Spark task is killed due to intentional job kills, automated killing of redundant speculative tasks, etc, ClosedByInterruptException occurs if task has unfinished I/O operation with AbstractInterruptibleChannel. A single cancelled task can result in hundreds of stack trace of ClosedByInterruptException being logged.
In this PR, stack trace of ClosedByInterruptException won't be logged like Executor.run do for InterruptedException.
### Why are the changes needed?
Large numbers of spurious exceptions is confusing to users when they are inspecting Spark logs to diagnose other issues.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
N/A
Closes#25674 from colinmjj/spark-28340.
Authored-by: colinma <colinma@tencent.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
In spark-shell local mode, in the task page, host name is coming as localhost
This PR changes it to show machine IP, as shown in the "spark.driver.host" in the environment page
### Why are the changes needed?
To show the proper IP in the task page host column
### Does this PR introduce any user-facing change?
It updates the SPARK UI->Task page->Host Column
### How was this patch tested?
verfied in spark UI
![image](https://user-images.githubusercontent.com/7912929/64079045-253d9e00-cd00-11e9-8092-26caec4e21dc.png)
Closes#25645 from shivusondur/localhost1.
Authored-by: shivusondur <shivusondur@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
This patch fixes the bug which throws ConcurrentModificationException when job with 0 partition is submitted via DAGScheduler.
### Why are the changes needed?
Without this patch, structured streaming query throws ConcurrentModificationException, like below stack trace:
```
19/09/04 09:48:49 ERROR AsyncEventQueue: Listener EventLoggingListener threw an exception
java.util.ConcurrentModificationException
at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424)
at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:237)
at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.util.JsonProtocol$.mapToJson(JsonProtocol.scala:514)
at org.apache.spark.util.JsonProtocol$.$anonfun$propertiesToJson$1(JsonProtocol.scala:520)
at scala.Option.map(Option.scala:163)
at org.apache.spark.util.JsonProtocol$.propertiesToJson(JsonProtocol.scala:519)
at org.apache.spark.util.JsonProtocol$.jobStartToJson(JsonProtocol.scala:155)
at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:79)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:149)
at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:217)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:99)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:84)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:102)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:102)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:97)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:93)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:93)
```
Please refer https://issues.apache.org/jira/browse/SPARK-28967 for detailed reproducer.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Newly added UT. Also manually tested via running simple structured streaming query in spark-shell.
Closes#25672 from HeartSaVioR/SPARK-28967.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
`ReplayListenerSuite` depends on a listener class to listen for replayed events. This class was implemented by extending `EventLoggingListener`. `EventLoggingListener` does not log executor metrics update events, but uses them to update internal state; on a stage completion event, it then logs stage executor metrics events using this internal state. As executor metrics update events do not get written to the event log, they do not get replayed. The internal state of the replay listener can therefore be different from the original listener, leading to different stage completion events being logged.
We reimplement the replay listener to simply buffer each and every event it receives. This makes it a simpler yet better tool for verifying the events that get sent through the ReplayListenerBus.
### Why are the changes needed?
As explained above. Tests sometimes fail due to events being received by the `EventLoggingListener` that do not get logged (and thus do not get replayed) but influence other events that get logged.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing unit tests.
Closes#25673 from wypoon/SPARK-28770.
Authored-by: Wing Yew Poon <wypoon@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
### What changes were proposed in this pull request?
Use `KeyLock` added in #25612 to simplify `MapOutputTracker.getStatuses`. It also has some improvement after the refactoring:
- `InterruptedException` is no longer sallowed.
- When a shuffle block is fetched, we don't need to wake up unrelated sleeping threads.
### Why are the changes needed?
`MapOutputTracker.getStatuses` is pretty hard to maintain right now because it has a special lock mechanism which we needs to pay attention to whenever updating this method. As we can use `KeyLock` to hide the complexity of locking behind a dedicated lock class, it's better to refactor it to make it easy to understand and maintain.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Existing tests.
Closes#25680 from zsxwing/getStatuses.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
### What changes were proposed in this pull request?
Replaces some incorrect usage of `new Configuration()` as it will load default configs defined in Hadoop
### Why are the changes needed?
Unexpected config could be accessed instead of the expected config, see SPARK-28203 for example
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existed tests.
Closes#25616 from advancedxy/remove_invalid_configuration.
Authored-by: Xianjin YE <advancedxy@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
This patch implements dynamic partition pruning by adding a dynamic-partition-pruning filter if there is a partitioned table and a filter on the dimension table. The filter is then planned using a heuristic approach:
1. As a broadcast relation if it is a broadcast hash join. The broadcast relation will then be transformed into a reused broadcast exchange by the `ReuseExchange` rule; or
2. As a subquery duplicate if the estimated benefit of partition table scan being saved is greater than the estimated cost of the extra scan of the duplicated subquery; otherwise
3. As a bypassed condition (`true`).
### Why are the changes needed?
This is an important performance feature.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Added UT
- Testing DPP by enabling / disabling the reuse broadcast results feature and / or the subquery duplication feature.
- Testing DPP with reused broadcast results.
- Testing the key iterators on different HashedRelation types.
- Testing the packing and unpacking of the broadcast keys in a LongType.
Closes#25600 from maryannxue/dpp.
Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
### What changes were proposed in this pull request?
This patch fixes the bugs in test code itself, FsHistoryProviderSuite.
1. When creating log file via `newLogFile`, codec is ignored, leading to wrong file name. (No one tends to create test for test code, as well as the bug doesn't affect existing tests indeed, so not easy to catch.)
2. When writing events to log file via `writeFile`, metadata (in case of new format) gets written to file regardless of its codec, and the content is overwritten by another stream, hence no information for Spark version is available. It affects existing test, hence we have wrong expected value to workaround the bug.
This patch also removes redundant parameter `isNewFormat` in `writeFile`, as according to review comment, Spark no longer supports old format.
### Why are the changes needed?
Explained in above section why they're bugs, though they only reside in test-code. (Please note that the bug didn't come from non-test side of code.)
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Modified existing UTs, as well as read event log file in console to see metadata is not overwritten by other contents.
Closes#25629 from HeartSaVioR/MINOR-FIX-FsHistoryProviderSuite.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
### What changes were proposed in this pull request?
We are adding other resource type support to the executors and Spark. We should show the resource information for each executor on the UI Executors page.
This also adds a toggle button to show the resources column. It is off by default.
![executorui1](https://user-images.githubusercontent.com/4563792/63891432-c815b580-c9aa-11e9-9f41-62975649efbc.png)
![Screenshot from 2019-08-28 14-56-26](https://user-images.githubusercontent.com/4563792/63891516-fd220800-c9aa-11e9-9fe4-89fcdca37306.png)
### Why are the changes needed?
to show user what resources the executors have. Like Gpus, fpgas, etc
### Does this PR introduce any user-facing change?
Yes introduces UI and rest api changes to show the resources
### How was this patch tested?
Unit tests and manual UI tests on yarn and standalone modes.
Closes#25613 from tgravescs/SPARK-27489-gpu-ui-latest.
Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
### What changes were proposed in this pull request?
This PR provides a new lock mechanism `KeyLock` to lock with a given key. Also use this new lock in `TorrentBroadcast` to avoid blocking tasks from fetching different broadcast values.
### Why are the changes needed?
`TorrentBroadcast.readObject` uses a global lock so only one task can be fetching the blocks at the same time. This is not optimal if we are running multiple stages concurrently because they should be able to independently fetch their own blocks.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#25612 from zsxwing/SPARK-3137.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
### What changes were proposed in this pull request?
The Experimental and Evolving annotations are both (like Unstable) used to express that a an API may change. However there are many things in the code that have been marked that way since even Spark 1.x. Per the dev thread, anything introduced at or before Spark 2.3.0 is pretty much 'stable' in that it would not change without a deprecation cycle. Therefore I'd like to remove most of these annotations. And, remove the `:: Experimental ::` scaladoc tag too. And likewise for Python, R.
The changes below can be summarized as:
- Generally, anything introduced at or before Spark 2.3.0 has been unmarked as neither Evolving nor Experimental
- Obviously experimental items like DSv2, Barrier mode, ExperimentalMethods are untouched
- I _did_ unmark a few MLlib classes introduced in 2.4, as I am quite confident they're not going to change (e.g. KolmogorovSmirnovTest, PowerIterationClustering)
It's a big change to review, so I'd suggest scanning the list of _files_ changed to see if any area seems like it should remain partly experimental and examine those.
### Why are the changes needed?
Many of these annotations are incorrect; the APIs are de facto stable. Leaving them also makes legitimate usages of the annotations less meaningful.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#25558 from srowen/SPARK-28855.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Use the shuffle writer APIs introduced in SPARK-28209 in the sort shuffle writer.
## How was this patch tested?
Existing unit tests were changed to use the plugin instead, and they used the local disk version to ensure that there were no regressions.
Closes#25342 from mccheah/shuffle-writer-refactor-sort-shuffle-writer.
Lead-authored-by: mcheah <mcheah@palantir.com>
Co-authored-by: mccheah <mcheah@palantir.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
### What changes were proposed in this pull request?
When starting python processes, set `OMP_NUM_THREADS` to the number of cores allocated to an executor or driver if `OMP_NUM_THREADS` is not already set. Each python process will use the same `OMP_NUM_THREADS` setting, even if workers are not shared.
This avoids creating an OpenMP thread pool for parallel processing with a number of threads equal to the number of cores on the executor and [significantly reduces memory consumption](https://github.com/numpy/numpy/issues/10455). Instead, this threadpool should use the number of cores allocated to the executor, if available. If a setting for number of cores is not available, this doesn't change any behavior. OpenMP is used by numpy and pandas.
### Why are the changes needed?
To reduce memory consumption for PySpark jobs.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Validated this reduces python worker memory consumption by more than 1GB on our cluster.
Closes#25545 from rdblue/SPARK-28843-set-omp-num-cores.
Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
The shuffle writer API introduced in SPARK-28209 has a flaw that leads to a memory usage regression - we ended up tracking the partition lengths in two places. Here, we modify the API slightly to avoid redundant tracking. The implementation of the shuffle writer plugin is now responsible for tracking the lengths of partitions, and propagating this back up to the higher shuffle writer as part of the commitAllPartitions API.
## How was this patch tested?
Existing unit tests.
Closes#25341 from mccheah/dont-redundantly-store-part-lengths.
Authored-by: mcheah <mcheah@palantir.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
In my application spark streaming is restarted programmatically by stopping StreamingContext without stopping of SparkContext and creating/starting a new one. I use it for automatic detection of Kafka topic/partition changes and automatic failover in case of non fatal exceptions.
However i notice that after multiple restarts driver fails with OOM. During investigation of heap dump i figured out that StreamingContext object isn't cleared by GC after stopping.
<img width="1901" alt="Screen Shot 2019-08-14 at 12 23 33" src="https://user-images.githubusercontent.com/13151161/63010149-83f4c200-be8e-11e9-9f48-12b6e97839f4.png">
There are several places which holds reference to it :
1. StreamingTab registers StreamingJobProgressListener which holds reference to Streaming Context directly to LiveListenerBus shared queue via ssc.sc.addSparkListener(listener) method invocation. However this listener isn't unregistered at stop method.
2. json handlers (/streaming/json and /streaming/batch/json) aren't unregistered in SparkUI, while they hold reference to StreamingJobProgressListener. Basically the same issue affects all the pages, i assume that renderJsonHandler should be added to pageToHandlers cache on attachPage method invocation in order to unregistered it as well on detachPage.
3. SparkUi holds reference to StreamingJobProgressListener in the corresponding local variable which isn't cleared after stopping of StreamingContext.
## How was this patch tested?
Added tests to existing test suites.
After i applied these changes via reflection in my app OOM on driver side gone.
Closes#25439 from choojoyq/SPARK-28709-fix-streaming-context-leak-on-stop.
Authored-by: Nikita Gorbachevsky <nikitag@playtika.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
Resolves [SPARK-28778: Shuffle jobs fail due to incorrect advertised address when running in a virtual network on Mesos](https://issues.apache.org/jira/browse/SPARK-28778).
This patch fixes a bug which occurs when shuffle jobs are launched by Mesos in a virtual network. Mesos scheduler sets executor `--hostname` parameter to `0.0.0.0` in the case when `spark.mesos.network.name` is provided. This makes executors use `0.0.0.0` as their advertised address and, in the presence of shuffle, executors fail to fetch shuffle blocks from each other using `0.0.0.0` as the origin. When a virtual network is used the hostname or IP address is not known upfront and assigned to a container at its start time so the executor process needs to advertise the correct dynamically assigned address to be reachable by other executors.
Changes:
- added a fallback to `Utils.localHostName()` in Spark Executors when `--hostname` is not provided
- removed setting executor address to `0.0.0.0` from Mesos scheduler
- refactored the code related to building executor command in Mesos scheduler
- added network configuration support to Docker containerizer
- added unit tests
### Why are the changes needed?
The bug described above prevents Mesos users from running any jobs which involve shuffle due to the inability of executors to fetch shuffle blocks because of incorrect advertised address when virtual network is used.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
- added unit test to `MesosCoarseGrainedSchedulerBackendSuite` which verifies the absence of `--hostname` parameter when `spark.mesos.network.name` is provided and its presence otherwise
- added unit test to `MesosSchedulerBackendUtilSuite` which verifies that `MesosSchedulerBackendUtil.buildContainerInfo` sets network-related properties for Docker containerizer
- unit tests from this repo launched with profiles: `./build/mvn test -Pmesos -Pnetlib-lgpl -Psparkr -Phive -Phive-thriftserver`, build log attached: [mvn.test.log](https://github.com/apache/spark/files/3516891/mvn.test.log)
- integration tests from [DCOS Spark repo](https://github.com/mesosphere/spark-build), more specifically - [test_spark_cni.py](https://github.com/mesosphere/spark-build/blob/master/tests/test_spark_cni.py) which runs a specific [shuffle job](https://github.com/mesosphere/spark-build/blob/master/tests/jobs/scala/src/main/scala/ShuffleApp.scala) and verifies its successful completion, Mesos task network configuration, and IP addresses for both Mesos and Docker containerizers
Closes#25500 from akirillov/DCOS-45840-fix-advertised-ip-in-virtual-networks.
Authored-by: Anton Kirillov <akirillov@mesosophere.io>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Improved warning message in Barrier Execution Mode when required slots > maximum slots.
The new message contains information about required slots, maximum slots and how many times retry failed.
### Why are the changes needed?
Providing to users with the number of required slots, maximum slots and how many times retry failed might help users to decide what they should do.
For example, continuing to wait for retry succeeded or killing jobs.
### Does this PR introduce any user-facing change?
Yes.
If `spark.scheduler.barrier.maxConcurrentTaskCheck.maxFailures=3`, we get following warning message.
Before applying this change:
```
19/08/18 15:18:09 WARN DAGScheduler: The job 2 requires to run a barrier stage that requires more slots than the total number of slots in the cluster currently.
19/08/18 15:18:24 WARN DAGScheduler: The job 2 requires to run a barrier stage that requires more slots than the total number of slots in the cluster currently.
19/08/18 15:18:39 WARN DAGScheduler: The job 2 requires to run a barrier stage that requires more slots than the total number of slots in the cluster currently.
19/08/18 15:18:54 WARN DAGScheduler: The job 2 requires to run a barrier stage that requires more slots than the total number of slots in the cluster currently.
org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed: [SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires more slots than the total number of slots in the cluster currently. Please init a new cluster with more CPU cores or repartition the input RDD(s) to reduce the number of slots required to run this barrier stage.
at org.apache.spark.scheduler.DAGScheduler.checkBarrierStageWithNumSlots(DAGScheduler.scala:439)
at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:453)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:983)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2140)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2132)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2121)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:749)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2080)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2145)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:961)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:366)
at org.apache.spark.rdd.RDD.collect(RDD.scala:960)
... 47 elided
```
After applying this change:
```
19/08/18 16:52:23 WARN DAGScheduler: The job 0 requires to run a barrier stage that requires 3 slots than the total number of slots(2) in the cluster currently.
19/08/18 16:52:38 WARN DAGScheduler: The job 0 requires to run a barrier stage that requires 3 slots than the total number of slots(2) in the cluster currently (Retry 1/3 failed).
19/08/18 16:52:53 WARN DAGScheduler: The job 0 requires to run a barrier stage that requires 3 slots than the total number of slots(2) in the cluster currently (Retry 2/3 failed).
19/08/18 16:53:08 WARN DAGScheduler: The job 0 requires to run a barrier stage that requires 3 slots than the total number of slots(2) in the cluster currently (Retry 3/3 failed).
org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed: [SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires more slots than the total number of slots in the cluster currently. Please init a new cluster with more CPU cores or repartition the input RDD(s) to reduce the number of slots required to run this barrier stage.
at org.apache.spark.scheduler.DAGScheduler.checkBarrierStageWithNumSlots(DAGScheduler.scala:439)
at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:453)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:983)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2140)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2132)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2121)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:749)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2080)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2145)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:961)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:366)
at org.apache.spark.rdd.RDD.collect(RDD.scala:960)
... 47 elided
```
### How was this patch tested?
I tested manually using Spark Shell with following configuration and script. And then, checked log message.
```
$ bin/spark-shell --master local[2] --conf spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures=3
scala> sc.parallelize(1 to 100, sc.defaultParallelism+1).barrier.mapPartitions(identity(_)).collect
```
Closes#25487 from sarutak/barrier-exec-mode-warning-message.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
Spark uses Netty 4 directly, but also includes Netty 3 only because transitive dependencies do. The dependencies (Hadoop HDFS, Zookeeper, Avro) don't seem to need this dependency as used in Spark. I think we can forcibly remove it to slim down the dependencies.
Previous attempts were blocked by its usage in Flume, but that dependency has gone away.
https://github.com/apache/spark/pull/15436
### Why are the changes needed?
Mostly to reduce the transitive dependency size and complexity a little bit and avoid triggering spurious security alerts on Netty 3.x usage.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Existing tests
Closes#25544 from srowen/SPARK-17875.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
<!--
Thanks for sending a pull request! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
4. Be sure to keep the PR description updated to reflect all changes.
5. Please write your PR title to summarize what this PR proposes.
6. If possible, provide a concise example to reproduce the issue for a faster review.
-->
### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
2. If you fix some SQL features, you can provide some references of other DBMSes.
3. If there is design documentation, please add the link.
4. If there is a discussion in the mailing list, please add the link.
-->
Dealing with interrupted exception in BarrierTaskContext.barrier()
### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
-->
Interrupted exception will happen in the case sparkContext local property "spark.job.interruptOnCancel" set true.
### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->
No.
### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->
UT.
Closes#25519 from WeichenXu123/barrier_fl.
Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This change reverts the logic which was introduced as a part of SPARK-24149 and a subsequent followup PR.
With existing logic:
- Spark fails to launch with HDFS federation enabled while trying to get a path to a logical nameservice.
- It gets tokens for unrelated namespaces if they are used in HDFS Federation
- Automatic namespace discovery is supported only if these are on the same cluster.
Rationale for change:
- For accessing data from related namespaces, viewfs should handle getting tokens for spark
- For accessing data from unrelated namespaces(user explicitly specifies them using existing configs) as these could be on the same or different cluster.
(Please fill in changes proposed in this fix)
Revert the changes.
## How was this patch tested?
Ran few manual tests and unit test.
Closes#24785 from dhruve/bug/SPARK-27937.
Authored-by: Dhruve Ashar <dhruveashar@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
Fix canceling a spark job using barrier mode but barrier tasks do not exit.
Currently, when spark tasks are killed, `BarrierTaskContext.barrier()` cannot be killed (it will blocking on RPC request), cause the task blocking and cannot exit.
In my PR I implement an interface for RPC which support `abort` in class `RpcEndpointRef`
```
def askAbortable[T: ClassTag](
message: Any,
timeout: RpcTimeout): AbortableRpcFuture[T]
```
The returned `AbortableRpcFuture` instance include an `abort` method so that we can abort the RPC before it timeout.
## How was this patch tested?
Unit test added.
Manually test:
### Test code
launch spark-shell via `spark-shell --master local[4]`
and run following code:
```
sc.setLogLevel("INFO")
import org.apache.spark.BarrierTaskContext
val n = 4
def taskf(iter: Iterator[Int]): Iterator[Int] = {
val context = BarrierTaskContext.get()
val x = iter.next()
if (x % 2 == 0) {
// sleep 6000000 seconds with task killed checking
for (i <- 0 until 6000000) {
Thread.sleep(1000)
if (context.isInterrupted()) {
throw new org.apache.spark.TaskKilledException()
}
}
}
context.barrier()
return Iterator.empty
}
// launch spark job, including 4 tasks, tasks 1/3 will enter `barrier()`, and tasks 0/2 will enter `sleep`
sc.parallelize((0 to n), n).barrier().mapPartitions(taskf).collect()
```
And then press Ctrl+C to exit the running job.
### Before
press Ctrl+C to exit the running job, then open spark UI we can see 2 tasks (task 1/3) are not killed. They are blocking.
### After
press Ctrl+C to exit the running job, we can see in spark UI all tasks killed successfully.
Please review https://spark.apache.org/contributing.html before opening a pull request.
Closes#25235 from WeichenXu123/sc_14848.
Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Change the logic of collecting the indeterminate stage, we should look at stages from mapStage, not failedStage during handle FetchFailed.
### Why are the changes needed?
In the fetch failed error handle logic, the original logic of collecting indeterminate stage from the fetch failed stage. And in the scenario of the fetch failed happened in the first task of this stage, this logic will cause the indeterminate stage to resubmit partially. Eventually, we are capable of getting correctness bug.
### Does this PR introduce any user-facing change?
It makes the corner case of indeterminate stage abort as expected.
### How was this patch tested?
New UT in DAGSchedulerSuite.
Run below integrated test with `local-cluster[5, 2, 5120]`, and set `spark.sql.execution.sortBeforeRepartition`=false, it will abort the indeterminate stage as expected:
```
import scala.sys.process._
import org.apache.spark.TaskContext
val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
throw new Exception("pkill -f -n java".!!)
}
x
}
val r2 = df.distinct.count()
```
Closes#25498 from xuanyuanking/SPARK-28699-followup.
Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Update Jersey to 2.27+, ideally 2.29, for possible JDK 11 fixes.
## How was this patch tested?
Existing tests.
Closes#25455 from srowen/SPARK-28737.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
In pr #[24533](https://github.com/apache/spark/pull/24533/files) , it prevent retry to a removed Executor.
In my test, I can't catch exceptions from
` new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
transportConf, tempFileManager).start()`
And I check the code carefully, method **start()** will handle exception of IOException in it's retry logical, won't throw it out. until it meet maxRetry times or meet exception that is not IOException.
And if we meet the situation that when we fetch block , the executor is dead, when we rerun
`RetryingBlockFetcher.BlockFetchStarter.createAndStart()`
we may failed when we create a transport client to dead executor. it will throw a IOException.
We should catch this IOException.
### Why are the changes needed?
Old solution not comprehensive. Didn't cover more case.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Existed Unit Test
Closes#25469 from AngersZhuuuu/SPARK-27637-FLLOW-UP.
Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This patch adds the binding classes to enable spark to switch dataframe output to using the S3A zero-rename committers shipping in Hadoop 3.1+. It adds a source tree into the hadoop-cloud-storage module which only compiles with the hadoop-3.2 profile, and contains a binding for normal output and a specific bridge class for Parquet (as the parquet output format requires a subclass of `ParquetOutputCommitter`.
Commit algorithms are a critical topic. There's no formal proof of correctness, but the algorithms are documented an analysed in [A Zero Rename Committer](https://github.com/steveloughran/zero-rename-committer/releases). This also reviews the classic v1 and v2 algorithms, IBM's swift committer and the one from EMRFS which they admit was based on the concepts implemented here.
Test-wise
* There's a public set of scala test suites [on github](https://github.com/hortonworks-spark/cloud-integration)
* We have run integration tests against Spark on Yarn clusters.
* This code has been shipping for ~12 months in HDP-3.x.
Closes#24970 from steveloughran/cloud/SPARK-23977-s3a-committer.
Authored-by: Steve Loughran <stevel@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
1. PythonHadoopUtil.mapToConf generates a Configuration with loadDefaults disabled
2. merging hadoop conf in several places of PythonRDD is consistent.
## How was this patch tested?
Added a new test and existed tests
Closes#25002 from advancedxy/SPARK-28203.
Authored-by: Xianjin YE <advancedxy@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This change implements a few changes to the k8s pod allocator so
that it behaves a little better when dynamic allocation is on.
(i) Allow the application to ramp up immediately when there's a
change in the target number of executors. Without this change,
scaling would only trigger when a change happened in the state of
the cluster, e.g. an executor going down, or when the periodical
snapshot was taken (default every 30s).
(ii) Get rid of pending pod requests, both acknowledged (i.e. Spark
knows that a pod is pending resource allocation) and unacknowledged
(i.e. Spark has requested the pod but the API server hasn't created it
yet), when they're not needed anymore. This avoids starting those
executors to just remove them after the idle timeout, wasting resources
in the meantime.
(iii) Re-work some of the code to avoid unnecessary logging. While not
bad without dynamic allocation, the existing logging was very chatty
when dynamic allocation was on. With the changes, all the useful
information is still there, but only when interesting changes happen.
(iv) Gracefully shut down executors when they become idle. Just deleting
the pod causes a lot of ugly logs to show up, so it's better to ask pods
to exit nicely. That also allows Spark to respect the "don't delete
pods" option when dynamic allocation is on.
Tested on a small k8s cluster running different TPC-DS workloads.
Closes#25236 from vanzin/SPARK-28487.
Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
In the current UI, we cannot identify which RDDs are barrier. Visualizing it will make easy to debug.
Following images are shown after this change.
![Screenshot from 2019-07-30 16-30-35](https://user-images.githubusercontent.com/4736016/62110508-83cec100-b2e9-11e9-83b9-bc2e485a4cbe.png)
![Screenshot from 2019-07-30 16-31-09](https://user-images.githubusercontent.com/4736016/62110509-83cec100-b2e9-11e9-9e2e-47c4dae23a52.png)
The boxes in pale green mean barrier (We might need to discuss which color is proper).
## How was this patch tested?
Tested manually.
The images above are shown by following operations.
```
val rdd1 = sc.parallelize(1 to 10)
val rdd2 = sc.parallelize(1 to 10)
val rdd3 = rdd1.zip(rdd2).barrier.mapPartitions(identity(_))
val rdd4 = rdd3.map(identity(_))
val rdd5 = rdd4.reduceByKey(_+_)
rdd5.collect
```
Closes#25296 from sarutak/barrierexec-dagviz.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
## What changes were proposed in this pull request?
By SPARK-17019, `On Heap Memory` and `Off Heap Memory` are introduced as optional metrics.
But they are not displayed because they are made `display: none` in css and there are no way to appear them.
I know #22595 also try to resolve this issue but that will use `additional-metrics.js`.
Initially, `additional-metrics.js` is created for `StagePage` but `StagePage` currently uses `stagepage.js` for its additional metrics to be toggle because `DataTable (one of jQuery plugins)` was introduced and we needed another mechanism to add/remove columns for additional metrics.
Now that `ExecutorsPage` also uses `DataTable` so it might be better to introduce same mechanism as `StagePage` for additional metrics.
![Screenshot from 2019-08-10 05-37-25](https://user-images.githubusercontent.com/4736016/62807960-c4240f80-bb31-11e9-8e1a-1a44e2f91597.png)
And then, we can remove `additional-metrics.js` which is no longer used from anywhere.
## How was this patch tested?
After this change is applied, I confirmed `ExecutorsPage` and `StagePage` are properly rendered and all checkboxes for additional metrics work.
Closes#25374 from sarutak/remove-additional-metrics.js.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
Currently, on requesting summary metrics, cached data are returned if the current number of "SUCCESS" tasks is the same as the value in cached data.
However, the number of "SUCCESS" tasks is wrong when there are running tasks. In `AppStatusStore`, the KVStore is `ElementTrackingStore`, instead of `InMemoryStore`. The value count is always the number of "SUCCESS" tasks + "RUNNING" tasks.
Thus, even when the running tasks are finished, the out-of-update cached data is returned.
This PR is to fix the code in getting the number of "SUCCESS" tasks.
## How was this patch tested?
Test manually, run
```
sc.parallelize(1 to 160, 40).map(i => Thread.sleep(i*100)).collect()
```
and keep refreshing the stage page , we can see the task summary metrics is wrong.
### Before fix:
![image](https://user-images.githubusercontent.com/1097932/62560343-6a141780-b8af-11e9-8942-d88540659a93.png)
### After fix:
![image](https://user-images.githubusercontent.com/1097932/62560355-7009f880-b8af-11e9-8ba8-10c083a48d7b.png)
Closes#25369 from gengliangwang/fixStagePage.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
* Add log in `NewHadoopRDD`
* Remove some words in logs which related to specific user API.
## How was this patch tested?
Manual.
Please review https://spark.apache.org/contributing.html before opening a pull request.
Closes#25391 from WeichenXu123/log_sf.
Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
SPARK-24817 and SPARK-24819 introduced new 3 non-internal properties for barrier-execution mode but they are not documented.
So I've added a section into configuration.md for barrier-mode execution.
## How was this patch tested?
Built using jekyll and confirm the layout by browser.
Closes#25370 from sarutak/barrier-exec-mode-conf-doc.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
In StagePage, only the first optional column (Scheduler Delay, in this case) appears even though "Select All" checkbox is checked.
![Screenshot from 2019-08-09 18-46-05](https://user-images.githubusercontent.com/4736016/62771600-8f379e80-bad8-11e9-9faa-6da8d57739d2.png)
The cause is that wrong method is used to manipulate multiple columns. columns should have been used but column was used.
I've fixed this issue by replacing the `column` with `columns`.
## How was this patch tested?
Confirmed behavior of the check-box.
![Screenshot from 2019-08-09 18-54-33](https://user-images.githubusercontent.com/4736016/62771614-98c10680-bad8-11e9-9cc0-5879ac47d1e1.png)
Closes#25397 from sarutak/fix-stagepage.js.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This PR fixed typos in comments and replace the explicit type with '<>' for Java 8+.
## How was this patch tested?
Manually tested.
Closes#25338 from younggyuchun/younggyu.
Authored-by: younggyu chun <younggyuchun@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
In high workload environments, ContextCleaner seems to have excessive logging at INFO level which do not give much information. In one Particular case we see that ``INFO ContextCleaner: Cleaned accumulator`` message is 25-30% of the generated logs. We can log this information for cleanup in DEBUG level instead.
## How was this patch tested?
This do not modify any functionality. This is just changing cleanup log levels to DEBUG for ContextCleaner
Closes#25396 from ajithme/logss.
Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
In this PR, we implements a complete process of GPU-aware resources scheduling
in Standalone. The whole process looks like: Worker sets up isolated resources
when it starts up and registers to master along with its resources. And, Master
picks up usable workers according to driver/executor's resource requirements to
launch driver/executor on them. Then, Worker launches the driver/executor after
preparing resources file, which is created under driver/executor's working directory,
with specified resource addresses(told by master). When driver/executor finished,
their resources could be recycled to worker. Finally, if a worker stops, it
should always release its resources firstly.
For the case of Workers and Drivers in **client** mode run on the same host, we introduce
a config option named `spark.resources.coordinate.enable`(default true) to indicate
whether Spark should coordinate resources for user. If `spark.resources.coordinate.enable=false`, user should be responsible for configuring different resources for Workers and Drivers when use resourcesFile or discovery script. If true, Spark would help user to assign different resources for Workers and Drivers.
The solution for Spark to coordinate resources among Workers and Drivers is:
Generally, use a shared file named *____allocated_resources____.json* to sync allocated
resources info among Workers and Drivers on the same host.
After a Worker or Driver found all resources using the configured resourcesFile and/or
discovery script during launching, it should filter out available resources by excluding resources already allocated in *____allocated_resources____.json* and acquire resources from available resources according to its own requirement. After that, it should write its allocated resources along with its process id (pid) into *____allocated_resources____.json*. Pid (proposed by tgravescs) here used to check whether the allocated resources are still valid in case of Worker or Driver crashes and doesn't release resources properly. And when a Worker or Driver finished, normally, it would always clean up its own allocated resources in *____allocated_resources____.json*.
Note that we'll always get a file lock before any access to file *____allocated_resources____.json*
and release the lock finally.
Futhermore, we appended resources info in `WorkerSchedulerStateResponse` to work
around master change behaviour in HA mode.
## How was this patch tested?
Added unit tests in WorkerSuite, MasterSuite, SparkContextSuite.
Manually tested with client/cluster mode (e.g. multiple workers) in a single node Standalone.
Closes#25047 from Ngone51/SPARK-27371.
Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
## What changes were proposed in this pull request?
This patch tries to keep consistency whenever UTF-8 charset is needed, as using `StandardCharsets.UTF_8` instead of using "UTF-8". If the String type is needed, `StandardCharsets.UTF_8.name()` is used.
This change also brings the benefit of getting rid of `UnsupportedEncodingException`, as we're providing `Charset` instead of `String` whenever possible.
This also changes some private Catalyst helper methods to operate on encodings as `Charset` objects rather than strings.
## How was this patch tested?
Existing unit tests.
Closes#25335 from HeartSaVioR/SPARK-28601.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
Currently, PythonBroadcast may delete its data file while a python worker still needs it. This happens because PythonBroadcast overrides the `finalize()` method to delete its data file. So, when GC happens and no references on broadcast variable, it may trigger `finalize()` to delete
data file. That's also means, data under python Broadcast variable couldn't be deleted when `unpersist()`/`destroy()` called but relys on GC.
In this PR, we removed the `finalize()` method, and map the PythonBroadcast data file to a BroadcastBlock(which has the same broadcast id with the broadcast variable who wrapped this PythonBroadcast) when PythonBroadcast is deserializing. As a result, the data file could be deleted just like other pieces of the Broadcast variable when `unpersist()`/`destroy()` called and do not rely on GC any more.
## How was this patch tested?
Added a Python test, and tested manually(verified create/delete the broadcast block).
Closes#25262 from Ngone51/SPARK-28486.
Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
After SPARK-27677, the shuffle client not only handles the shuffle block but also responsible for local persist RDD blocks. For better code scalability and precise semantics(as the [discussion](https://github.com/apache/spark/pull/24892#discussion_r300173331)), here we did several changes:
- Rename ShuffleClient to BlockStoreClient.
- Correspondingly rename the ExternalShuffleClient to ExternalBlockStoreClient, also change the server-side class from ExternalShuffleBlockHandler to ExternalBlockHandler.
- Move MesosExternalBlockStoreClient to Mesos package.
Note, we still keep the name of BlockTransferService, because the `Service` contains both client and server, also the name of BlockTransferService is not referencing shuffle client only.
## How was this patch tested?
Existing UT.
Closes#25327 from xuanyuanking/SPARK-28593.
Lead-authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Co-authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Add configuration spark.scheduler.listenerbus.eventqueue.${name}.capacity to allow configuration of different event queue size.
## How was this patch tested?
Unit test in core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
Closes#25307 from yunzoud/SPARK-28574.
Authored-by: yunzoud <yun.zou@databricks.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
Today all registered metric sources are reported to GraphiteSink with no filtering mechanism, although the codahale project does support it.
GraphiteReporter (ScheduledReporter) from the codahale project requires you implement and supply the MetricFilter interface (there is only a single implementation by default in the codahale project, MetricFilter.ALL).
Propose to add an additional regex config to match and filter metrics to the GraphiteSink
## How was this patch tested?
Included a GraphiteSinkSuite that tests:
1. Absence of regex filter (existing default behavior maintained)
2. Presence of `regex=<regexexpr>` correctly filters metric keys
Closes#25232 from nkarpov/graphite_regex.
Authored-by: Nick Karpov <nick@nickkarpov.com>
Signed-off-by: jerryshao <jerryshao@tencent.com>
There's a small, probably very hard to hit thread-safety issue in the blacklist
abort timers in the task scheduler, where they access a non-thread-safe map without
locks.
In the tests, the code was also calling methods on the TaskSetManager without
holding the proper locks, which could cause threads to call non-thread-safe
TSM methods concurrently.
Closes#25317 from vanzin/SPARK-28584.
Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
Prior to this change, in an executor, on each heartbeat, memory metrics are polled and sent in the heartbeat. The heartbeat interval is 10s by default. With this change, in an executor, memory metrics can optionally be polled in a separate poller at a shorter interval.
For each executor, we use a map of (stageId, stageAttemptId) to (count of running tasks, executor metric peaks) to track what stages are active as well as the per-stage memory metric peaks. When polling the executor memory metrics, we attribute the memory to the active stage(s), and update the peaks. In a heartbeat, we send the per-stage peaks (for stages active at that time), and then reset the peaks. The semantics would be that the per-stage peaks sent in each heartbeat are the peaks since the last heartbeat.
We also keep a map of taskId to memory metric peaks. This tracks the metric peaks during the lifetime of the task. The polling thread updates this as well. At end of a task, we send the peak metric values in the task result. In case of task failure, we send the peak metric values in the `TaskFailedReason`.
We continue to do the stage-level aggregation in the EventLoggingListener.
For the driver, we still only poll on heartbeats. What the driver sends will be the current values of the metrics in the driver at the time of the heartbeat. This is semantically the same as before.
## How was this patch tested?
Unit tests. Manually tested applications on an actual system and checked the event logs; the metrics appear in the SparkListenerTaskEnd and SparkListenerStageExecutorMetrics events.
Closes#23767 from wypoon/wypoon_SPARK-26329.
Authored-by: Wing Yew Poon <wypoon@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
## What changes were proposed in this pull request?
Logging in driver when loading single large unsplittable file via `sc.textFile` or csv/json datasouce.
Current condition triggering logging is
* only generate one partition
* file is unsplittable, possible reason is:
- compressed by unsplittable compression algo such as gzip.
- multiLine mode in csv/json datasource
- wholeText mode in text datasource
* file size exceed the config threshold `spark.io.warning.largeFileThreshold` (default value is 1GB)
## How was this patch tested?
Manually test.
Generate one gzip file exceeding 1GB,
```
base64 -b 50 /dev/urandom | head -c 2000000000 > file1.txt
cat file1.txt | gzip > file1.gz
```
then launch spark-shell,
run
```
sc.textFile("file:///path/to/file1.gz").count()
```
Will print log like:
```
WARN HadoopRDD: Loading one large unsplittable file file:/.../f1.gz with only one partition, because the file is compressed by unsplittable compression codec
```
run
```
sc.textFile("file:///path/to/file1.txt").count()
```
Will print log like:
```
WARN HadoopRDD: Loading one large file file:/.../f1.gz with only one partition, we can increase partition numbers by the `minPartitions` argument in method `sc.textFile
```
run
```
spark.read.csv("file:///path/to/file1.gz").count
```
Will print log like:
```
WARN CSVScan: Loading one large unsplittable file file:/.../f1.gz with only one partition, the reason is: the file is compressed by unsplittable compression codec
```
run
```
spark.read.option("multiLine", true).csv("file:///path/to/file1.gz").count
```
Will print log like:
```
WARN CSVScan: Loading one large unsplittable file file:/.../f1.gz with only one partition, the reason is: the csv datasource is set multiLine mode
```
JSON and Text datasource also tested with similar cases.
Please review https://spark.apache.org/contributing.html before opening a pull request.
Closes#25134 from WeichenXu123/log_gz.
Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
The issue is that the test tried to stop an existing scheduler and replace it with
a new one set up for the test. That can cause issues because both were sharing the
same RpcEnv underneath, and unregistering RpcEndpoints is actually asynchronous
(see comment in Dispatcher.unregisterRpcEndpoint). So that could lead to races where
the new scheduler tried to register before the old one was fully unregistered.
The updated test avoids the issue by using a separate RpcEnv / scheduler instance
altogether, and also avoids a misleading NPE in the test logs.
Closes#25318 from vanzin/SPARK-24352.
Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>