Commit graph

7244 commits

Author SHA1 Message Date
Holden Karau bd05339171 [SPARK-29158][SQL] Expose SerializableConfiguration for DataSource V2 developers
### What changes were proposed in this pull request?

Currently the SerializableConfiguration, which makes the Hadoop configuration serializable is private. This makes it public, with a developer annotation.

### Why are the changes needed?

Many data source depend on the Hadoop configuration which may have specific components on the driver. Inside of Spark's own DataSourceV2 implementations this is frequently used (Parquet, Json, Orc, etc.)

### Does this PR introduce any user-facing change?

This provides a new developer API.

### How was this patch tested?

No new tests are added as this only exposes a previously developed & thoroughly used + tested component.

Closes #25838 from holdenk/SPARK-29158-expose-serializableconfiguration-for-dsv2.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-09-20 14:39:24 +09:00
Dongjoon Hyun 76ebf2241a Revert "[SPARK-29082][CORE] Skip delegation token generation if no credentials are available"
This reverts commit f32f16fd68.
2019-09-19 17:54:42 -07:00
Sean Owen c5d8a51f3b [MINOR][BUILD] Fix about 15 misc build warnings
### What changes were proposed in this pull request?

This addresses about 15 miscellaneous warnings that appear in the current build.

### Why are the changes needed?

No functional changes, it just slightly reduces the amount of extra warning output.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing tests, run manually.

Closes #25852 from srowen/BuildWarnings.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-09-19 11:37:42 -07:00
Marcelo Vanzin f32f16fd68 [SPARK-29082][CORE] Skip delegation token generation if no credentials are available
This situation can happen when an external system (e.g. Oozie) generates
delegation tokens for a Spark application. The Spark driver will then run
against secured services, have proper credentials (the tokens), but no
kerberos credentials. So trying to do things that requires a kerberos
credential fails.

Instead, if no kerberos credentials are detected, just skip the whole
delegation token code.

Tested with an application that simulates Oozie; fails before the fix,
passes with the fix. Also with other DT-related tests to make sure other
functionality keeps working.

Closes #25805 from vanzin/SPARK-29082.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-09-18 13:30:00 -07:00
Luca Canali cd481773c3 [SPARK-28091][CORE] Extend Spark metrics system with user-defined metrics using executor plugins
## What changes were proposed in this pull request?

This proposes to improve Spark instrumentation by adding a hook for user-defined metrics, extending Spark’s Dropwizard/Codahale metrics system.
The original motivation of this work was to add instrumentation for S3 filesystem access metrics by Spark job. Currently, [[ExecutorSource]] instruments HDFS and local filesystem metrics. Rather than extending the code there, we proposes with this JIRA to add a metrics plugin system which is of more flexible and general use.
Context: The Spark metrics system provides a large variety of metrics, see also , useful to  monitor and troubleshoot Spark workloads. A typical workflow is to sink the metrics to a storage system and build dashboards on top of that.
Highlights:
-	The metric plugin system makes it easy to implement instrumentation for S3 access by Spark jobs.
-	The metrics plugin system allows for easy extensions of how Spark collects HDFS-related workload metrics. This is currently done using the Hadoop Filesystem GetAllStatistics method, which is deprecated in recent versions of Hadoop. Recent versions of Hadoop Filesystem recommend using method GetGlobalStorageStatistics, which also provides several additional metrics. GetGlobalStorageStatistics is not available in Hadoop 2.7 (had been introduced in Hadoop 2.8). Using a metric plugin for Spark would allow an easy way to “opt in” using such new API calls for those deploying suitable Hadoop versions.
-	We also have the use case of adding Hadoop filesystem monitoring for a custom Hadoop compliant filesystem in use in our organization (EOS using the XRootD protocol). The metrics plugin infrastructure makes this easy to do. Others may have similar use cases.
-	More generally, this method makes it straightforward to plug in Filesystem and other metrics to the Spark monitoring system. Future work on plugin implementation can address extending monitoring to measure usage of external resources (OS, filesystem, network, accelerator cards, etc), that maybe would not normally be considered general enough for inclusion in Apache Spark code, but that can be nevertheless useful for specialized use cases, tests or troubleshooting.

Implementation:
The proposed implementation extends and modifies the work on Executor Plugin of SPARK-24918. Additionally, this is related to recent work on extending Spark executor metrics, such as SPARK-25228.
As discussed during the review, the implementaiton of this feature modifies the Developer API for Executor Plugins, such that the new version is incompatible with the original version in Spark 2.4.

## How was this patch tested?

This modifies existing tests for ExecutorPluginSuite to adapt them to the API changes. In addition, the new funtionality for registering pluginMetrics has been manually tested running Spark on YARN and K8S clusters, in particular for monitoring S3 and for extending HDFS instrumentation with the Hadoop Filesystem “GetGlobalStorageStatistics” metrics. Executor metric plugin example and code used for testing are available, for example at: https://github.com/cerndb/SparkExecutorPlugins

Closes #24901 from LucaCanali/executorMetricsPlugin.

Authored-by: Luca Canali <luca.canali@cern.ch>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-09-18 10:32:10 -07:00
Marcelo Vanzin 276aaaae8d [SPARK-29105][CORE] Keep driver log file size up to date in HDFS
HDFS doesn't update the file size reported by the NM if you just keep
writing to the file; this makes the SHS believe the file is inactive,
and so it may delete it after the configured max age for log files.

This change uses hsync to keep the log file as up to date as possible
when using HDFS. It also disables erasure coding by default for these
logs, since hsync (& friends) does not work with EC.

Tested with a SHS configured to aggressively clean up logs; verified
a spark-shell session kept updating the log, which was not deleted by
the SHS.

Closes #25819 from vanzin/SPARK-29105.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-09-18 09:11:55 -07:00
Pavithra Ramachandran 600a2a4ae5 [SPARK-28972][DOCS] Updating unit description in configurations, to maintain consistency
### What changes were proposed in this pull request?
Updating unit description in configurations, inorder to maintain consistency across configurations.

### Why are the changes needed?
the description does not mention about suffix that can be mentioned while configuring this value.
For better user understanding

### Does this PR introduce any user-facing change?
yes. Doc description

### How was this patch tested?
generated document and checked.
![Screenshot from 2019-09-05 11-09-17](https://user-images.githubusercontent.com/51401130/64314853-07a55880-cfce-11e9-8af0-6416a50b0188.png)

Closes #25689 from PavithraRamachandran/heapsize_config.

Authored-by: Pavithra Ramachandran <pavi.rams@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-09-18 09:11:15 -05:00
turbofei eef5e6d348 [SPARK-29113][DOC] Fix some annotation errors and remove meaningless annotations in project
### What changes were proposed in this pull request?

In this PR, I fix some annotation errors and remove meaningless annotations in project.
### Why are the changes needed?
There are some annotation errors and meaningless annotations in project.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Verified manually.

Closes #25809 from turboFei/SPARK-29113.

Authored-by: turbofei <fwang12@ebay.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-09-18 13:12:18 +09:00
Dongjoon Hyun 3ece8ee157 [SPARK-29124][CORE] Use MurmurHash3 bytesHash(data, seed) instead of bytesHash(data)
### What changes were proposed in this pull request?

This PR changes `bytesHash(data)` API invocation with the underlaying `byteHash(data, arraySeed)` invocation.
```scala
def bytesHash(data: Array[Byte]): Int = bytesHash(data, arraySeed)
```

### Why are the changes needed?

The original API is changed between Scala versions by the following commit. From Scala 2.12.9, the semantic of the function is changed. If we use the underlying form, we are safe during Scala version migration.
- 846ee2b1a4 (diff-ac889f851e109fc4387cd738d52ce177)

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

This is a kind of refactoring.

Pass the Jenkins with the existing tests.

Closes #25821 from dongjoon-hyun/SPARK-SCALA-HASH.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-09-18 10:33:03 +09:00
Chris Martin 05988b256e [SPARK-27463][PYTHON] Support Dataframe Cogroup via Pandas UDFs
### What changes were proposed in this pull request?

Adds a new cogroup Pandas UDF.  This allows two grouped dataframes to be cogrouped together and apply a (pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame UDF to each cogroup.

**Example usage**

```
from pyspark.sql.functions import pandas_udf, PandasUDFType
df1 = spark.createDataFrame(
   [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
   ("time", "id", "v1"))

df2 = spark.createDataFrame(
   [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP)
   def asof_join(l, r):
      return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show()

```

        +--------+---+---+---+
        |    time| id| v1| v2|
        +--------+---+---+---+
        |20000101|  1|1.0|  x|
        |20000102|  1|3.0|  x|
        |20000101|  2|2.0|  y|
        |20000102|  2|4.0|  y|
        +--------+---+---+---+

### How was this patch tested?

Added unit test test_pandas_udf_cogrouped_map

Closes #24981 from d80tb7/SPARK-27463-poc-arrow-stream.

Authored-by: Chris Martin <chris@cmartinit.co.uk>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-09-17 17:13:50 -07:00
Dongjoon Hyun 34915b22ab [SPARK-29104][CORE][TESTS] Fix PipedRDDSuite to use eventually to check thread termination
### What changes were proposed in this pull request?

`PipedRDD` will invoke `stdinWriterThread.interrupt()` at task completion, and `obj.wait` will get `InterruptedException`. However, there exists a possibility which the thread termination gets delayed because the thread starts from `obj.wait()` with that exception. To prevent test flakiness, we need to use `eventually`. Also, This PR fixes the typo in code comment and variable name.

### Why are the changes needed?

```
- stdin writer thread should be exited when task is finished *** FAILED ***
  Some(Thread[stdin writer for List(cat),5,]) was not empty (PipedRDDSuite.scala:107)
```

- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/6867/testReport/junit/org.apache.spark.rdd/PipedRDDSuite/stdin_writer_thread_should_be_exited_when_task_is_finished/

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Manual.

We can reproduce the same failure like Jenkins if we catch `InterruptedException` and sleep longer than the `eventually` timeout inside the test code. The following is the example to reproduce it.
```scala
val nums = sc.makeRDD(Array(1, 2, 3, 4), 1).map { x =>
  try {
    obj.synchronized {
      obj.wait() // make the thread waits here.
    }
  } catch {
    case ie: InterruptedException =>
      Thread.sleep(15000)
      throw ie
  }
  x
}
```

Closes #25808 from dongjoon-hyun/SPARK-29104.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-09-17 20:21:25 +09:00
WeichenXu 104b9b6f8c [SPARK-28483][FOLLOW-UP] Fix flaky test in BarrierTaskContextSuite
### What changes were proposed in this pull request?

I fix the test "barrier task killed" which is flaky:

* Split interrupt/no interrupt test into separate sparkContext. Prevent them to influence each other.
* only check exception on partiton-0. partition-1 is hang on sleep which may throw other exception.

### Why are the changes needed?
Make test robust.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
N/A

Closes #25799 from WeichenXu123/oss_fix_barrier_test.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: WeichenXu <weichen.xu@databricks.com>
2019-09-17 19:08:09 +08:00
iRakson 79b10a1aab [SPARK-28929][CORE] Spark Logging level should be INFO instead of DEBUG in Executor Plugin API
### What changes were proposed in this pull request?

Log levels in Executor.scala are changed from DEBUG to INFO.

### Why are the changes needed?

Logging level DEBUG is too low here. These messages are simple acknowledgement for successful loading and initialization of plugins. So its better to keep them in INFO level.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Manually tested.

Closes #25634 from iRakson/ExecutorPlugin.

Authored-by: iRakson <raksonrakesh@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-09-17 00:53:12 -07:00
mcheah 67751e2694 [SPARK-29072][CORE] Put back usage of TimeTrackingOutputStream for UnsafeShuffleWriter and SortShuffleWriter
### What changes were proposed in this pull request?
The previous refactors of the shuffle writers using the shuffle writer plugin resulted in shuffle write metric updates - particularly write times - being lost in particular situations. This patch restores the lost metric updates.

### Why are the changes needed?
This fixes a regression. I'm pretty sure that without this, the Spark UI will lose shuffle write time information.

### Does this PR introduce any user-facing change?
No change from Spark 2.4. Without this, there would be a user-facing bug in Spark 3.0.

### How was this patch tested?
Existing unit tests.

Closes #25780 from mccheah/fix-write-metrics.

Authored-by: mcheah <mcheah@palantir.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
2019-09-16 09:08:25 -05:00
Dongjoon Hyun 729b3180bc [SPARK-29087][CORE][STREAMING] Use DelegatingServletContextHandler to avoid CCE
### What changes were proposed in this pull request?

[SPARK-27122](https://github.com/apache/spark/pull/24088) fixes `ClassCastException` at `yarn` module by introducing `DelegatingServletContextHandler`. Initially, this was discovered with JDK9+, but the class path issues affected JDK8 environment, too. After [SPARK-28709](https://github.com/apache/spark/pull/25439), I also hit the similar issue at `streaming` module.

This PR aims to fix `streaming` module by adding `getContextPath` to `DelegatingServletContextHandler` and using it.

### Why are the changes needed?

Currently, when we test `streaming` module independently, it fails like the following.
```
$ build/mvn test -pl streaming
...
UISeleniumSuite:
- attaching and detaching a Streaming tab *** FAILED ***
  java.lang.ClassCastException: org.sparkproject.jetty.servlet.ServletContextHandler cannot be cast to org.eclipse.jetty.servlet.ServletContextHandler
...
Tests: succeeded 337, failed 1, canceled 0, ignored 1, pending 0
*** 1 TEST FAILED ***
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
```

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass the Jenkins with the modified tests. And do the following manually.
Since you can observe this when you run `streaming` module test only (instead of running all), you need to install the changed `core` module and use it.

```
$ java -version
openjdk version "1.8.0_222"
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_222-b10)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.222-b10, mixed mode)
$ build/mvn install -DskipTests
$ build/mvn test -pl streaming
```

Closes #25791 from dongjoon-hyun/SPARK-29087.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-09-15 10:15:49 -07:00
David Lewis 8c0e961f6c [SPARK-29081][CORE] Replace calls to SerializationUtils.clone on properties with a faster implementation
Replace use of `SerializationUtils.clone` with new `Utils.cloneProperties` method
Add benchmark + results showing dramatic speed up for effectively equivalent functionality.

### What changes were proposed in this pull request?
While I am not sure that SerializationUtils.clone is a performance issue in production, I am sure that it is overkill for the task it is doing (providing a distinct copy of a `Properties` object).
This PR provides a benchmark showing the dramatic improvement over the clone operation and replaces uses of `SerializationUtils.clone` on `Properties` with the more specialized `Utils.cloneProperties`.

### Does this PR introduce any user-facing change?
Strings are immutable so there is no reason to serialize and deserialize them, it just creates extra garbage.
The only functionality that would be changed is the unsupported insertion of non-String objects into the spark local properties.

### How was this patch tested?

1. Pass the Jenkins with the existing tests.
2. Since this is a performance improvement PR, manually run the benchmark.

Closes #25787 from databricks-david-lewis/SPARK-29081.

Authored-by: David Lewis <david.lewis@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-09-15 00:28:32 -07:00
Andy Zhang 956f6e988c [SPARK-29080][CORE][SPARKR] Support R file extension case-insensitively
### What changes were proposed in this pull request?

Make r file extension check case insensitive for spark-submit.

### Why are the changes needed?

spark-submit does not accept `.r` files as R scripts. Some codebases have r files that end with lowercase file extensions. It is inconvenient to use spark-submit with lowercase extension R files. The error is not very clear (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala#L232).

```
$ ./bin/spark-submit examples/src/main/r/dataframe.r
Exception in thread "main" org.apache.spark.SparkException: Cannot load main class from JAR file:/Users/dongjoon/APACHE/spark-release/spark-2.4.4-bin-hadoop2.7/examples/src/main/r/dataframe.r
```

### Does this PR introduce any user-facing change?

Yes. spark-submit can now be used to run R scripts with `.r` file extension.

### How was this patch tested?

Manual.

```
$ mv examples/src/main/r/dataframe.R examples/src/main/r/dataframe.r
$ ./bin/spark-submit examples/src/main/r/dataframe.r
```

Closes #25778 from Loquats/r-case.

Authored-by: Andy Zhang <yue.zhang@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-09-15 00:17:11 -07:00
Shanyu Zhao 174d9104cf [SPARK-29003][CORE] Add start method to ApplicationHistoryProvider to avoid deadlock on startup
### What changes were proposed in this pull request?

During Spark History Server startup, there are two things happening simultaneously that call into `java.nio.file.FileSystems.getDefault()` and we sometime hit [JDK-8194653](https://bugs.openjdk.java.net/browse/JDK-8194653).
1) start jetty server
2) start ApplicationHistoryProvider (which reads files from HDFS)

We should do these two things sequentially instead of in parallel.
We introduce a start() method in ApplicationHistoryProvider (and its subclass FsHistoryProvider), and we do initialize inside the start() method instead of the constructor.
In HistoryServer, we explicitly call provider.start() after we call bind() which starts the Jetty server.

### Why are the changes needed?
It is a bug that occasionally starting Spark History Server results in process hang due to deadlock among threads.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
I stress tested this PR with a bash script to stop and start Spark History Server more than 1000 times, it worked fine. Previously I can only do the stop/start loop less than 10 times before I hit the deadlock issue.

Closes #25705 from shanyu/shanyu-29003.

Authored-by: Shanyu Zhao <shzhao@microsoft.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-09-13 21:09:17 -07:00
Dongjoon Hyun 8f057a9612 [SPARK-29032][CORE] Add PrometheusServlet to monitor Master/Worker/Driver
### What changes were proposed in this pull request?

This PR aims to simplify `Prometheus` support by adding `PrometheusServlet`. The main use cases are `K8s` and `Spark Standalone` cluster environments.

### Why are the changes needed?

Prometheus.io is a CNCF project used widely with K8s.
- https://github.com/prometheus/prometheus

For `Master/Worker/Driver`, `Spark JMX Sink` and `Prometheus JMX Converter` combination is used in many cases. One way to achieve that is having the followings.

**JMX Sink (conf/metrics.properties)**
```
*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
```

**JMX Converter(conf/spark-env.sh)**
- https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.12.0/jmx_prometheus_javaagent-0.12.0.jar
```
export SPARK_DAEMON_JAVA_OPTS=
"-javaagent:${PWD}/jmx_prometheus_javaagent-${JMX_PROMETHEUS_VERSION}.jar=
${PORT_AGENT}:jmx_prometheus.yaml"
```

This agent approach requires `PORT_AGENT` additionally. Instead, this PR natively support `Prometheus` format exporting with reusing REST API port for the better UX.

### Does this PR introduce any user-facing change?

Yes. New web interfaces are added along with the existing JSON API.

|              |                         JSON End Point                       |            Prometheus End Point          |
| ------- | ------------------------------------------- | ---------------------------------- |
| Master | /metrics/master/json/                                     | /metrics/master/prometheus/          |
| Master | /metrics/applications/json/                             | /metrics/applications/prometheus/ |
| Worker | /metrics/json/                                                   | /metrics/prometheus/                      |
| Driver   | /metrics/json/                                                   | /metrics/prometheus/                      |

### How was this patch tested?

Manually connect the new end-points with `curl`.

**Setup (Master/Worker/Driver)**
Add the followings at `conf/metrics.properties` (`conf/metrics.properties.template` has these examples)
```
*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet
*.sink.prometheusServlet.path=/metrics/prometheus
master.sink.prometheusServlet.path=/metrics/master/prometheus
applications.sink.prometheusServlet.path=/metrics/applications/prometheus
```
```
$ sbin/start-master.sh
$ sbin/start-slave.sh spark://`hostname`:7077
$ bin/spark-shell --master spark://`hostname`:7077
```

```
$ curl -s http://localhost:8080/metrics/master/json/ | jq
{
  "version": "3.1.3",
  "gauges": {
    "master.aliveWorkers": {
      "value": 1
    },
    "master.apps": {
      "value": 1
    },
    "master.waitingApps": {
      "value": 0
    },
    "master.workers": {
      "value": 1
    }
  },
...
$ curl -s http://localhost:8080/metrics/master/prometheus/ | grep master
metrics_master_aliveWorkers_Value 1
metrics_master_apps_Value 1
metrics_master_waitingApps_Value 0
metrics_master_workers_Value 1
```

```
$ curl -s http://localhost:8080/metrics/applications/json/ | jq
{
  "version": "3.1.3",
  "gauges": {
    "application.Spark shell.1568261490667.cores": {
      "value": 16
    },
    "application.Spark shell.1568261490667.runtime_ms": {
      "value": 108966
    },
    "application.Spark shell.1568261490667.status": {
      "value": "RUNNING"
    }
  },
...
$ curl -s http://localhost:8080/metrics/applications/prometheus/ | grep application
metrics_application_Spark_shell_1568261490667_cores_Value 16
metrics_application_Spark_shell_1568261490667_runtime_ms_Value 143174
```

```
$ curl -s http://localhost:8081/metrics/json/ | jq
{
  "version": "3.1.3",
  "gauges": {
    "worker.coresFree": {
      "value": 0
    },
    "worker.coresUsed": {
      "value": 16
    },
    "worker.executors": {
      "value": 1
    },
    "worker.memFree_MB": {
      "value": 30720
    },
    "worker.memUsed_MB": {
      "value": 1024
    }
  },
...
$ curl -s http://localhost:8081/metrics/prometheus/ | grep worker
metrics_worker_coresFree_Value 0
metrics_worker_coresUsed_Value 16
metrics_worker_executors_Value 1
metrics_worker_memFree_MB_Value 30720
metrics_worker_memUsed_MB_Value 1024
```

```
$ curl -s http://localhost:4040/metrics/json/ | jq
{
  "version": "3.1.3",
  "gauges": {
    "app-20190911211130-0000.driver.BlockManager.disk.diskSpaceUsed_MB": {
      "value": 0
    },
    "app-20190911211130-0000.driver.BlockManager.memory.maxMem_MB": {
      "value": 732
    },
    "app-20190911211130-0000.driver.BlockManager.memory.maxOffHeapMem_MB": {
      "value": 0
    },
    "app-20190911211130-0000.driver.BlockManager.memory.maxOnHeapMem_MB": {
      "value": 732
    },
...
$ curl -s http://localhost:4040/metrics/prometheus/ | head -n5
metrics_app_20190911211130_0000_driver_BlockManager_disk_diskSpaceUsed_MB_Value 0
metrics_app_20190911211130_0000_driver_BlockManager_memory_maxMem_MB_Value 732
metrics_app_20190911211130_0000_driver_BlockManager_memory_maxOffHeapMem_MB_Value 0
metrics_app_20190911211130_0000_driver_BlockManager_memory_maxOnHeapMem_MB_Value 732
metrics_app_20190911211130_0000_driver_BlockManager_memory_memUsed_MB_Value 0
```

Closes #25769 from dongjoon-hyun/SPARK-29032-2.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-09-13 21:31:21 +00:00
Dongjoon Hyun bbfaadb280 [SPARK-29064][CORE] Add PrometheusResource to export Executor metrics
### What changes were proposed in this pull request?

At Apache Spark 3.0.0, [SPARK-23429](https://github.com/apache/spark/pull/21221) added the ability to collect executor metrics via heartbeats and to expose it as a REST API. This PR aims to extend it to support `Prometheus` format additionally.

### Why are the changes needed?

Prometheus.io is a CNCF project used widely with K8s.
- https://github.com/prometheus/prometheus

### Does this PR introduce any user-facing change?

Yes. New web interfaces are added along with the existing JSON API.

|              |                JSON End Point                    |            Prometheus End Point         |
| ------- | ------------------------------------ | --------------------------------- |
| Driver   | /api/v1/applications/{id}/executors/   | /metrics/executors/prometheus/   |

### How was this patch tested?

Manually connect to the new end-points with `curl` and compare with JSON.

**SETUP**
```
$ sbin/start-master.sh
$ sbin/start-slave.sh spark://`hostname`:7077
$ bin/spark-shell --master spark://`hostname`:7077 --conf spark.ui.prometheus.enabled=true
```

**JSON (existing after SPARK-23429)**
```
$ curl -s http://localhost:4040/api/v1/applications/app-20190911204823-0000/executors
[ {
  "id" : "driver",
  "hostPort" : "localhost:52615",
  "isActive" : true,
  "rddBlocks" : 0,
  "memoryUsed" : 0,
  "diskUsed" : 0,
  "totalCores" : 0,
  "maxTasks" : 0,
  "activeTasks" : 0,
  "failedTasks" : 0,
  "completedTasks" : 0,
  "totalTasks" : 0,
  "totalDuration" : 0,
  "totalGCTime" : 0,
  "totalInputBytes" : 0,
  "totalShuffleRead" : 0,
  "totalShuffleWrite" : 0,
  "isBlacklisted" : false,
  "maxMemory" : 384093388,
  "addTime" : "2019-09-12T03:48:23.875GMT",
  "executorLogs" : { },
  "memoryMetrics" : {
    "usedOnHeapStorageMemory" : 0,
    "usedOffHeapStorageMemory" : 0,
    "totalOnHeapStorageMemory" : 384093388,
    "totalOffHeapStorageMemory" : 0
  },
  "blacklistedInStages" : [ ],
  "peakMemoryMetrics" : {
    "JVMHeapMemory" : 229995952,
    "JVMOffHeapMemory" : 145872280,
    "OnHeapExecutionMemory" : 0,
    "OffHeapExecutionMemory" : 0,
    "OnHeapStorageMemory" : 0,
    "OffHeapStorageMemory" : 0,
    "OnHeapUnifiedMemory" : 0,
    "OffHeapUnifiedMemory" : 0,
    "DirectPoolMemory" : 75891,
    "MappedPoolMemory" : 0,
    "ProcessTreeJVMVMemory" : 0,
    "ProcessTreeJVMRSSMemory" : 0,
    "ProcessTreePythonVMemory" : 0,
    "ProcessTreePythonRSSMemory" : 0,
    "ProcessTreeOtherVMemory" : 0,
    "ProcessTreeOtherRSSMemory" : 0,
    "MinorGCCount" : 8,
    "MinorGCTime" : 82,
    "MajorGCCount" : 3,
    "MajorGCTime" : 128
  },
  "attributes" : { },
  "resources" : { }
}, {
  "id" : "0",
  "hostPort" : "127.0.0.1:52619",
  "isActive" : true,
  "rddBlocks" : 0,
  "memoryUsed" : 0,
  "diskUsed" : 0,
  "totalCores" : 16,
  "maxTasks" : 16,
  "activeTasks" : 0,
  "failedTasks" : 0,
  "completedTasks" : 0,
  "totalTasks" : 0,
  "totalDuration" : 0,
  "totalGCTime" : 0,
  "totalInputBytes" : 0,
  "totalShuffleRead" : 0,
  "totalShuffleWrite" : 0,
  "isBlacklisted" : false,
  "maxMemory" : 384093388,
  "addTime" : "2019-09-12T03:48:25.907GMT",
  "executorLogs" : {
    "stdout" : "http://127.0.0.1:8081/logPage/?appId=app-20190911204823-0000&executorId=0&logType=stdout",
    "stderr" : "http://127.0.0.1:8081/logPage/?appId=app-20190911204823-0000&executorId=0&logType=stderr"
  },
  "memoryMetrics" : {
    "usedOnHeapStorageMemory" : 0,
    "usedOffHeapStorageMemory" : 0,
    "totalOnHeapStorageMemory" : 384093388,
    "totalOffHeapStorageMemory" : 0
  },
  "blacklistedInStages" : [ ],
  "attributes" : { },
  "resources" : { }
} ]
```

**Prometheus**
```
$ curl -s http://localhost:4040/metrics/executors/prometheus
metrics_app_20190911204823_0000_driver_executor_rddBlocks_Count 0
metrics_app_20190911204823_0000_driver_executor_memoryUsed_Count 0
metrics_app_20190911204823_0000_driver_executor_diskUsed_Count 0
metrics_app_20190911204823_0000_driver_executor_totalCores_Count 0
metrics_app_20190911204823_0000_driver_executor_maxTasks_Count 0
metrics_app_20190911204823_0000_driver_executor_activeTasks_Count 0
metrics_app_20190911204823_0000_driver_executor_failedTasks_Count 0
metrics_app_20190911204823_0000_driver_executor_completedTasks_Count 0
metrics_app_20190911204823_0000_driver_executor_totalTasks_Count 0
metrics_app_20190911204823_0000_driver_executor_totalDuration_Value 0
metrics_app_20190911204823_0000_driver_executor_totalGCTime_Value 0
metrics_app_20190911204823_0000_driver_executor_totalInputBytes_Count 0
metrics_app_20190911204823_0000_driver_executor_totalShuffleRead_Count 0
metrics_app_20190911204823_0000_driver_executor_totalShuffleWrite_Count 0
metrics_app_20190911204823_0000_driver_executor_maxMemory_Count 384093388
metrics_app_20190911204823_0000_driver_executor_usedOnHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_usedOffHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_totalOnHeapStorageMemory_Count 384093388
metrics_app_20190911204823_0000_driver_executor_totalOffHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_JVMHeapMemory_Count 230406336
metrics_app_20190911204823_0000_driver_executor_JVMOffHeapMemory_Count 146132592
metrics_app_20190911204823_0000_driver_executor_OnHeapExecutionMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_OffHeapExecutionMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_OnHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_OffHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_OnHeapUnifiedMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_OffHeapUnifiedMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_DirectPoolMemory_Count 97049
metrics_app_20190911204823_0000_driver_executor_MappedPoolMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreeJVMVMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreeJVMRSSMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreePythonVMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreePythonRSSMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreeOtherVMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_ProcessTreeOtherRSSMemory_Count 0
metrics_app_20190911204823_0000_driver_executor_MinorGCCount_Count 8
metrics_app_20190911204823_0000_driver_executor_MinorGCTime_Count 82
metrics_app_20190911204823_0000_driver_executor_MajorGCCount_Count 3
metrics_app_20190911204823_0000_driver_executor_MajorGCTime_Count 128
metrics_app_20190911204823_0000_0_executor_rddBlocks_Count 0
metrics_app_20190911204823_0000_0_executor_memoryUsed_Count 0
metrics_app_20190911204823_0000_0_executor_diskUsed_Count 0
metrics_app_20190911204823_0000_0_executor_totalCores_Count 16
metrics_app_20190911204823_0000_0_executor_maxTasks_Count 16
metrics_app_20190911204823_0000_0_executor_activeTasks_Count 0
metrics_app_20190911204823_0000_0_executor_failedTasks_Count 0
metrics_app_20190911204823_0000_0_executor_completedTasks_Count 0
metrics_app_20190911204823_0000_0_executor_totalTasks_Count 0
metrics_app_20190911204823_0000_0_executor_totalDuration_Value 0
metrics_app_20190911204823_0000_0_executor_totalGCTime_Value 0
metrics_app_20190911204823_0000_0_executor_totalInputBytes_Count 0
metrics_app_20190911204823_0000_0_executor_totalShuffleRead_Count 0
metrics_app_20190911204823_0000_0_executor_totalShuffleWrite_Count 0
metrics_app_20190911204823_0000_0_executor_maxMemory_Count 384093388
metrics_app_20190911204823_0000_0_executor_usedOnHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_0_executor_usedOffHeapStorageMemory_Count 0
metrics_app_20190911204823_0000_0_executor_totalOnHeapStorageMemory_Count 384093388
metrics_app_20190911204823_0000_0_executor_totalOffHeapStorageMemory_Count 0
```

Closes #25770 from dongjoon-hyun/SPARK-29064.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-09-13 21:28:27 +00:00
Liang-Chi Hsieh c610de6952 [SPARK-29042][CORE] Sampling-based RDD with unordered input should be INDETERMINATE
### What changes were proposed in this pull request?

We already have found and fixed the correctness issue before when RDD output is INDETERMINATE. One missing part is sampling-based RDD. This kind of RDDs is order sensitive to its input. A sampling-based RDD with unordered input, should be INDETERMINATE.

### Why are the changes needed?

A sampling-based RDD with unordered input is just like MapPartitionsRDD with isOrderSensitive parameter as true. The RDD output can be different after a rerun.

It is a problem in ML applications.

In ML, sample is used to prepare training data. ML algorithm fits the model based on the sampled data. If rerun tasks of sample produce different output during model fitting, ML results will be unreliable and also buggy.

Each sample is random output, but once you sampled, the output should be determinate.

### Does this PR introduce any user-facing change?

Previously, a sampling-based RDD can possibly come with different output after a rerun.
After this patch, sampling-based RDD is INDETERMINATE. For an INDETERMINATE map stage, currently Spark scheduler will re-try all the tasks of the failed stage.

### How was this patch tested?

Added test.

Closes #25751 from viirya/sample-order-sensitive.

Authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: Liang-Chi Hsieh <liangchi@uber.com>
2019-09-13 14:07:00 -07:00
dengziming 8f632d7045 [MINOR][DOCS] Fix few typos in the java docs
JIRA :https://issues.apache.org/jira/browse/SPARK-29050
'a hdfs' change into  'an hdfs'
'an unique' change into 'a unique'
'an url' change into 'a url'
'a error' change into 'an error'

Closes #25756 from dengziming/feature_fix_typos.

Authored-by: dengziming <dengziming@growingio.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-09-12 09:30:03 +09:00
Jungtaek Lim (HeartSaVioR) 2736efa32d [SPARK-26989][CORE][TEST] DAGSchedulerSuite: ensure listeners are fully processed before checking recorded values
### What changes were proposed in this pull request?

This patch ensures accessing recorded values in listener is always after letting listeners fully process all events. To ensure this, this patch adds new class to hide these values and access with methods which will ensure above condition. Without this guard, two threads are running concurrently - 1) listeners process thread 2) test main thread - and race condition would occur.

That's why we also see very odd thing, error message saying condition is met but test failed:
```
- Barrier task failures from the same stage attempt don't trigger multiple stage retries *** FAILED ***
  ArrayBuffer(0) did not equal List(0) (DAGSchedulerSuite.scala:2656)
```
which means verification failed, and condition is met just before constructing error message.

The guard is properly placed in many spots, but missed in some places. This patch enforces that it can't be missed.

### Why are the changes needed?

UT fails intermittently and this patch will address the flakyness.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Modified UT.

Also made the flaky tests artificially failing via applying 50ms of sleep on each onXXX method.

![Screen Shot 2019-09-07 at 7 44 15 AM](https://user-images.githubusercontent.com/1317309/64465178-1747ad00-d146-11e9-92f6-f4ed4a1f4b08.png)

I found 3 methods being failed. (They've marked as X. Just ignore ! as they failed on waiting listener in given timeout and these tests don't deal with these recorded values - it uses other timeout value 1000ms than 10000ms for this listener so affected via side-effect.)

When I applied same in this patch all tests marked as X passed.

Closes #25706 from HeartSaVioR/SPARK-26989.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-09-11 10:24:57 -07:00
Liu,Linhong f263909ee1 [SPARK-23243][CORE][FOLLOWUP] Remove todo added by SPARK-23207
### What changes were proposed in this pull request?
PR #22112 fixed the todo added by PR #20393(SPARK-23207). We can remove it now.

### Why are the changes needed?
In order not to confuse developers.

### Does this PR introduce any user-facing change?
no

### How was this patch tested?
no need to test

Closes #25755 from LinhongLiu/remove-todo.

Authored-by: Liu,Linhong <liulinhong@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-09-11 21:51:48 +08:00
mcheah 7f36cd2aa5 [SPARK-28570][CORE][SHUFFLE] Make UnsafeShuffleWriter use the new API
## What changes were proposed in this pull request?

Uses the APIs introduced in SPARK-28209 in the UnsafeShuffleWriter.

## How was this patch tested?

Since this is just a refactor, existing unit tests should cover the relevant code paths. Micro-benchmarks from the original fork where this code was built show no degradation in performance.

Closes #25304 from mccheah/shuffle-writer-refactor-unsafe-writer.

Lead-authored-by: mcheah <mcheah@palantir.com>
Co-authored-by: mccheah <mcheah@palantir.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-09-10 17:30:02 -07:00
hongdd bdc1598a43 [SPARK-28657][CORE] Fix currentContext Instance failed sometimes
## What changes were proposed in this pull request?

Running spark on yarn, I got
```
java.lang.ClassCastException: org.apache.hadoop.ipc.CallerContext$Builder cannot be cast to scala.runtime.Nothing$
```
Utils.classForName return Class[Nothing], I think it should be defind as Class[_] to resolve this issue

## How was this patch tested?

not need

Closes #25389 from hddong/SPARK-28657-fix-currentContext-Instance-failed.

Lead-authored-by: hongdd <jn_hdd@163.com>
Co-authored-by: hongdongdong <hongdongdong@cmss.chinamobile.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-09-09 18:02:52 -05:00
Sean Owen 6378d4bc06 [SPARK-28980][CORE][SQL][STREAMING][MLLIB] Remove most items deprecated in Spark 2.2.0 or earlier, for Spark 3
### What changes were proposed in this pull request?

- Remove SQLContext.createExternalTable and Catalog.createExternalTable, deprecated in favor of createTable since 2.2.0, plus tests of deprecated methods
- Remove HiveContext, deprecated in 2.0.0, in favor of `SparkSession.builder.enableHiveSupport`
- Remove deprecated KinesisUtils.createStream methods, plus tests of deprecated methods, deprecate in 2.2.0
- Remove deprecated MLlib (not Spark ML) linear method support, mostly utility constructors and 'train' methods, and associated docs. This includes methods in LinearRegression, LogisticRegression, Lasso, RidgeRegression. These have been deprecated since 2.0.0
- Remove deprecated Pyspark MLlib linear method support, including LogisticRegressionWithSGD, LinearRegressionWithSGD, LassoWithSGD
- Remove 'runs' argument in KMeans.train() method, which has been a no-op since 2.0.0
- Remove deprecated ChiSqSelector isSorted protected method
- Remove deprecated 'yarn-cluster' and 'yarn-client' master argument in favor of 'yarn' and deploy mode 'cluster', etc

Notes:

- I was not able to remove deprecated DataFrameReader.json(RDD) in favor of DataFrameReader.json(Dataset); the former was deprecated in 2.2.0, but, it is still needed to support Pyspark's .json() method, which can't use a Dataset.
- Looks like SQLContext.createExternalTable was not actually deprecated in Pyspark, but, almost certainly was meant to be? Catalog.createExternalTable was.
- I afterwards noted that the toDegrees, toRadians functions were almost removed fully in SPARK-25908, but Felix suggested keeping just the R version as they hadn't been technically deprecated. I'd like to revisit that. Do we really want the inconsistency? I'm not against reverting it again, but then that implies leaving SQLContext.createExternalTable just in Pyspark too, which seems weird.
- I *kept* LogisticRegressionWithSGD, LinearRegressionWithSGD, LassoWithSGD, RidgeRegressionWithSGD in Pyspark, though deprecated, as it is hard to remove them (still used by StreamingLogisticRegressionWithSGD?) and they are not fully removed in Scala. Maybe should not have been deprecated.

### Why are the changes needed?

Deprecated items are easiest to remove in a major release, so we should do so as much as possible for Spark 3. This does not target items deprecated 'recently' as of Spark 2.3, which is still 18 months old.

### Does this PR introduce any user-facing change?

Yes, in that deprecated items are removed from some public APIs.

### How was this patch tested?

Existing tests.

Closes #25684 from srowen/SPARK-28980.

Lead-authored-by: Sean Owen <sean.owen@databricks.com>
Co-authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-09-09 10:19:40 -05:00
colinma dadb72028a [SPARK-28340][CORE] Noisy exceptions when tasks are killed: "DiskBloc…
### What changes were proposed in this pull request?

If a Spark task is killed due to intentional job kills, automated killing of redundant speculative tasks, etc, ClosedByInterruptException occurs if task has unfinished I/O operation with AbstractInterruptibleChannel. A single cancelled task can result in hundreds of stack trace of ClosedByInterruptException being logged.
In this PR, stack trace of ClosedByInterruptException won't be logged like Executor.run do for InterruptedException.

### Why are the changes needed?

Large numbers of spurious exceptions is confusing to users when they are inspecting Spark logs to diagnose other issues.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

N/A

Closes #25674 from colinmjj/spark-28340.

Authored-by: colinma <colinma@tencent.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-09-09 05:27:53 -05:00
shivusondur cb488ecf41 [SPARK-28942][WEBUI] Spark in local mode hostname display localhost in the Host Column of Task Summary Page
### What changes were proposed in this pull request?
In spark-shell local mode, in the task page, host name is coming as localhost
This PR changes it to show machine IP, as shown in the "spark.driver.host" in the environment page

### Why are the changes needed?
To show the proper IP in the task page host column

### Does this PR introduce any user-facing change?
It updates the SPARK UI->Task page->Host Column

### How was this patch tested?
verfied in spark UI

![image](https://user-images.githubusercontent.com/7912929/64079045-253d9e00-cd00-11e9-8092-26caec4e21dc.png)

Closes #25645 from shivusondur/localhost1.

Authored-by: shivusondur <shivusondur@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-09-08 19:45:19 -05:00
Jungtaek Lim (HeartSaVioR) 905b7f7fc7 [SPARK-28967][CORE] Include cloned version of "properties" to avoid ConcurrentModificationException
### What changes were proposed in this pull request?

This patch fixes the bug which throws ConcurrentModificationException when job with 0 partition is submitted via DAGScheduler.

### Why are the changes needed?

Without this patch, structured streaming query throws ConcurrentModificationException, like below stack trace:

```
19/09/04 09:48:49 ERROR AsyncEventQueue: Listener EventLoggingListener threw an exception
java.util.ConcurrentModificationException
	at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424)
	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:237)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.util.JsonProtocol$.mapToJson(JsonProtocol.scala:514)
	at org.apache.spark.util.JsonProtocol$.$anonfun$propertiesToJson$1(JsonProtocol.scala:520)
	at scala.Option.map(Option.scala:163)
	at org.apache.spark.util.JsonProtocol$.propertiesToJson(JsonProtocol.scala:519)
	at org.apache.spark.util.JsonProtocol$.jobStartToJson(JsonProtocol.scala:155)
	at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:79)
	at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:149)
	at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:217)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:99)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:84)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:102)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:102)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:97)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:93)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:93)
```

Please refer https://issues.apache.org/jira/browse/SPARK-28967 for detailed reproducer.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Newly added UT. Also manually tested via running simple structured streaming query in spark-shell.

Closes #25672 from HeartSaVioR/SPARK-28967.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-09-06 09:06:39 -05:00
Wing Yew Poon 151b954e52 [SPARK-28770][CORE][TEST] Fix ReplayListenerSuite tests that sometimes fail
### What changes were proposed in this pull request?

`ReplayListenerSuite` depends on a listener class to listen for replayed events. This class was implemented by extending `EventLoggingListener`. `EventLoggingListener` does not log executor metrics update events, but uses them to update internal state; on a stage completion event, it then logs stage executor metrics events using this internal state. As executor metrics update events do not get written to the event log, they do not get replayed. The internal state of the replay listener can therefore be different from the original listener, leading to different stage completion events being logged.

We reimplement the replay listener to simply buffer each and every event it receives. This makes it a simpler yet better tool for verifying the events that get sent through the ReplayListenerBus.

### Why are the changes needed?

As explained above. Tests sometimes fail due to events being received by the `EventLoggingListener` that do not get logged (and thus do not get replayed) but influence other events that get logged.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing unit tests.

Closes #25673 from wypoon/SPARK-28770.

Authored-by: Wing Yew Poon <wypoon@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
2019-09-05 15:55:22 -05:00
Shixiong Zhu 84a4d3a17c
[SPARK-28976][CORE] Use KeyLock to simplify MapOutputTracker.getStatuses
### What changes were proposed in this pull request?

Use `KeyLock` added in #25612 to simplify `MapOutputTracker.getStatuses`. It also has some improvement after the refactoring:
- `InterruptedException` is no longer sallowed.
- When a shuffle block is fetched, we don't need to wake up unrelated sleeping threads.

### Why are the changes needed?

`MapOutputTracker.getStatuses` is pretty hard to maintain right now because it has a special lock mechanism which we needs to pay attention to whenever updating this method. As we can use `KeyLock` to hide the complexity of locking behind a dedicated lock class, it's better to refactor it to make it easy to understand and maintain.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #25680 from zsxwing/getStatuses.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2019-09-04 23:20:27 -07:00
Xianjin YE ca71177868 [SPARK-28907][CORE] Review invalid usage of new Configuration()
### What changes were proposed in this pull request?
Replaces some incorrect usage of `new Configuration()` as it will load default configs defined in Hadoop

### Why are the changes needed?
Unexpected config could be accessed instead of the expected config, see SPARK-28203 for example

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existed tests.

Closes #25616 from advancedxy/remove_invalid_configuration.

Authored-by: Xianjin YE <advancedxy@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-09-04 19:52:19 -05:00
maryannxue a7a3935c97 [SPARK-11150][SQL] Dynamic Partition Pruning
### What changes were proposed in this pull request?
This patch implements dynamic partition pruning by adding a dynamic-partition-pruning filter if there is a partitioned table and a filter on the dimension table. The filter is then planned using a heuristic approach:
1. As a broadcast relation if it is a broadcast hash join. The broadcast relation will then be transformed into a reused broadcast exchange by the `ReuseExchange` rule; or
2. As a subquery duplicate if the estimated benefit of partition table scan being saved is greater than the estimated cost of the extra scan of the duplicated subquery; otherwise
3. As a bypassed condition (`true`).

### Why are the changes needed?
This is an important performance feature.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Added UT
- Testing DPP by enabling / disabling the reuse broadcast results feature and / or the subquery duplication feature.
- Testing DPP with reused broadcast results.
- Testing the key iterators on different HashedRelation types.
- Testing the packing and unpacking of the broadcast keys in a LongType.

Closes #25600 from maryannxue/dpp.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
2019-09-04 13:13:23 -07:00
Jungtaek Lim (HeartSaVioR) 712874fa09 [SPARK-28931][CORE][TESTS] Fix couple of bugs in FsHistoryProviderSuite
### What changes were proposed in this pull request?

This patch fixes the bugs in test code itself, FsHistoryProviderSuite.

1. When creating log file via `newLogFile`, codec is ignored, leading to wrong file name. (No one tends to create test for test code, as well as the bug doesn't affect existing tests indeed, so not easy to catch.)
2. When writing events to log file via `writeFile`, metadata (in case of new format) gets written to file regardless of its codec, and the content is overwritten by another stream, hence no information for Spark version is available. It affects existing test, hence we have wrong expected value to workaround the bug.

This patch also removes redundant parameter `isNewFormat` in `writeFile`, as according to review comment, Spark no longer supports old format.

### Why are the changes needed?

Explained in above section why they're bugs, though they only reside in test-code. (Please note that the bug didn't come from non-test side of code.)

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Modified existing UTs, as well as read event log file in console to see metadata is not overwritten by other contents.

Closes #25629 from HeartSaVioR/MINOR-FIX-FsHistoryProviderSuite.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-09-04 09:41:15 -07:00
Thomas Graves 4c8f114783 [SPARK-27489][WEBUI] UI updates to show executor resource information
### What changes were proposed in this pull request?
We are adding other resource type support to the executors and Spark. We should show the resource information for each executor on the UI Executors page.
This also adds a toggle button to show the resources column.  It is off by default.

![executorui1](https://user-images.githubusercontent.com/4563792/63891432-c815b580-c9aa-11e9-9f41-62975649efbc.png)

![Screenshot from 2019-08-28 14-56-26](https://user-images.githubusercontent.com/4563792/63891516-fd220800-c9aa-11e9-9fe4-89fcdca37306.png)

### Why are the changes needed?
to show user what resources the executors have. Like Gpus, fpgas, etc

### Does this PR introduce any user-facing change?
Yes introduces UI and rest api changes to show the resources

### How was this patch tested?
Unit tests and manual UI tests on yarn and standalone modes.

Closes #25613 from tgravescs/SPARK-27489-gpu-ui-latest.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
2019-09-04 09:45:44 +08:00
Shixiong Zhu 89800931aa
[SPARK-3137][CORE] Replace the global TorrentBroadcast lock with fine grained KeyLock
### What changes were proposed in this pull request?

This PR provides a new lock mechanism `KeyLock` to lock  with a given key. Also use this new lock in `TorrentBroadcast` to avoid blocking tasks from fetching different broadcast values.

### Why are the changes needed?

`TorrentBroadcast.readObject` uses a global lock so only one task can be fetching the blocks at the same time. This is not optimal if we are running multiple stages concurrently because they should be able to independently fetch their own blocks.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #25612 from zsxwing/SPARK-3137.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2019-09-03 14:09:07 -07:00
Sean Owen eb037a8180 [SPARK-28855][CORE][ML][SQL][STREAMING] Remove outdated usages of Experimental, Evolving annotations
### What changes were proposed in this pull request?

The Experimental and Evolving annotations are both (like Unstable) used to express that a an API may change. However there are many things in the code that have been marked that way since even Spark 1.x. Per the dev thread, anything introduced at or before Spark 2.3.0 is pretty much 'stable' in that it would not change without a deprecation cycle. Therefore I'd like to remove most of these annotations. And, remove the `:: Experimental ::` scaladoc tag too. And likewise for Python, R.

The changes below can be summarized as:
- Generally, anything introduced at or before Spark 2.3.0 has been unmarked as neither Evolving nor Experimental
- Obviously experimental items like DSv2, Barrier mode, ExperimentalMethods are untouched
- I _did_ unmark a few MLlib classes introduced in 2.4, as I am quite confident they're not going to change (e.g. KolmogorovSmirnovTest, PowerIterationClustering)

It's a big change to review, so I'd suggest scanning the list of _files_ changed to see if any area seems like it should remain partly experimental and examine those.

### Why are the changes needed?

Many of these annotations are incorrect; the APIs are de facto stable. Leaving them also makes legitimate usages of the annotations less meaningful.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #25558 from srowen/SPARK-28855.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-09-01 10:15:00 -05:00
mcheah ea90ea6ce7 [SPARK-28571][CORE][SHUFFLE] Use the shuffle writer plugin for the SortShuffleWriter
## What changes were proposed in this pull request?

Use the shuffle writer APIs introduced in SPARK-28209 in the sort shuffle writer.

## How was this patch tested?

Existing unit tests were changed to use the plugin instead, and they used the local disk version to ensure that there were no regressions.

Closes #25342 from mccheah/shuffle-writer-refactor-sort-shuffle-writer.

Lead-authored-by: mcheah <mcheah@palantir.com>
Co-authored-by: mccheah <mcheah@palantir.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-08-30 09:43:07 -07:00
Ryan Blue 31b59bd805 [SPARK-28843][PYTHON] Set OMP_NUM_THREADS to executor cores for python if not set
### What changes were proposed in this pull request?

When starting python processes, set `OMP_NUM_THREADS` to the number of cores allocated to an executor or driver if `OMP_NUM_THREADS` is not already set. Each python process will use the same `OMP_NUM_THREADS` setting, even if workers are not shared.

This avoids creating an OpenMP thread pool for parallel processing with a number of threads equal to the number of cores on the executor and [significantly reduces memory consumption](https://github.com/numpy/numpy/issues/10455). Instead, this threadpool should use the number of cores allocated to the executor, if available. If a setting for number of cores is not available, this doesn't change any behavior. OpenMP is used by numpy and pandas.

### Why are the changes needed?

To reduce memory consumption for PySpark jobs.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Validated this reduces python worker memory consumption by more than 1GB on our cluster.

Closes #25545 from rdblue/SPARK-28843-set-omp-num-cores.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-30 10:29:46 +09:00
wuyi 70f4bbccc5 [SPARK-28414][WEBUI] UI updates to show resource info in Standalone
## What changes were proposed in this pull request?

Since SPARK-27371 has supported GPU-aware resource scheduling in Standalone, this PR adds resources info in Standalone UI.

## How was this patch tested?

Updated `JsonProtocolSuite` and tested manually.

Master page:

![masterpage](https://user-images.githubusercontent.com/16397174/62835958-b933c100-bc90-11e9-814f-22bae048303d.png)

Worker page

![workerpage](https://user-images.githubusercontent.com/16397174/63417947-d2790200-c434-11e9-8979-36b8f558afd3.png)

Application page

![applicationpage](https://user-images.githubusercontent.com/16397174/62835964-cbadfa80-bc90-11e9-99a2-26e05421619a.png)

Closes #25409 from Ngone51/SPARK-28414.

Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
2019-08-27 08:59:29 -05:00
mcheah 2efa6f5dd3 [SPARK-28607][CORE][SHUFFLE] Don't store partition lengths twice
## What changes were proposed in this pull request?

The shuffle writer API introduced in SPARK-28209 has a flaw that leads to a memory usage regression - we ended up tracking the partition lengths in two places. Here, we modify the API slightly to avoid redundant tracking. The implementation of the shuffle writer plugin is now responsible for tracking the lengths of partitions, and propagating this back up to the higher shuffle writer as part of the commitAllPartitions API.

## How was this patch tested?

Existing unit tests.

Closes #25341 from mccheah/dont-redundantly-store-part-lengths.

Authored-by: mcheah <mcheah@palantir.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-08-26 10:39:29 -07:00
Nikita Gorbachevsky 9f8c7a2804 [SPARK-28709][DSTREAMS] Fix StreamingContext leak through Streaming
## What changes were proposed in this pull request?

In my application spark streaming is restarted programmatically by stopping StreamingContext without stopping of SparkContext and creating/starting a new one. I use it for automatic detection of Kafka topic/partition changes and automatic failover in case of non fatal exceptions.

However i notice that after multiple restarts driver fails with OOM. During investigation of heap dump i figured out that StreamingContext object isn't cleared by GC after stopping.

<img width="1901" alt="Screen Shot 2019-08-14 at 12 23 33" src="https://user-images.githubusercontent.com/13151161/63010149-83f4c200-be8e-11e9-9f48-12b6e97839f4.png">

There are several places which holds reference to it :

1. StreamingTab registers StreamingJobProgressListener which holds reference to Streaming Context directly to LiveListenerBus shared queue via ssc.sc.addSparkListener(listener) method invocation. However this listener isn't unregistered at stop method.
2. json handlers (/streaming/json and /streaming/batch/json) aren't unregistered in SparkUI, while they hold reference to StreamingJobProgressListener. Basically the same issue affects all the pages, i assume that renderJsonHandler should be added to pageToHandlers cache on attachPage method invocation in order to unregistered it as well on detachPage.
3. SparkUi holds reference to StreamingJobProgressListener in the corresponding local variable which isn't cleared after stopping of StreamingContext.

## How was this patch tested?

Added tests to existing test suites.
After i applied these changes via reflection in my app OOM on driver side gone.

Closes #25439 from choojoyq/SPARK-28709-fix-streaming-context-leak-on-stop.

Authored-by: Nikita Gorbachevsky <nikitag@playtika.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-26 09:30:36 -05:00
Anton Kirillov f17f1d01e2 [SPARK-28778][MESOS] Fixed executors advertised address when running in virtual network
### What changes were proposed in this pull request?
Resolves [SPARK-28778: Shuffle jobs fail due to incorrect advertised address when running in a virtual network on Mesos](https://issues.apache.org/jira/browse/SPARK-28778).

This patch fixes a bug which occurs when shuffle jobs are launched by Mesos in a virtual network. Mesos scheduler sets executor `--hostname` parameter to `0.0.0.0` in the case when `spark.mesos.network.name` is provided. This makes executors use `0.0.0.0` as their advertised address and, in the presence of shuffle, executors fail to fetch shuffle blocks from each other using `0.0.0.0` as the origin. When a virtual network is used the hostname or IP address is not known upfront and assigned to a container at its start time so the executor process needs to advertise the correct dynamically assigned address to be reachable by other executors.

Changes:
- added a fallback to `Utils.localHostName()` in Spark Executors when `--hostname` is not provided
- removed setting executor address to `0.0.0.0` from Mesos scheduler
- refactored the code related to building executor command in Mesos scheduler
- added network configuration support to Docker containerizer
- added unit tests

### Why are the changes needed?
The bug described above prevents Mesos users from running any jobs which involve shuffle due to the inability of executors to fetch shuffle blocks because of incorrect advertised address when virtual network is used.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
- added unit test to `MesosCoarseGrainedSchedulerBackendSuite` which verifies the absence of `--hostname` parameter  when `spark.mesos.network.name` is provided and its presence otherwise
- added unit test to `MesosSchedulerBackendUtilSuite` which verifies that `MesosSchedulerBackendUtil.buildContainerInfo` sets network-related properties for Docker containerizer
- unit tests from this repo launched with profiles: `./build/mvn test -Pmesos -Pnetlib-lgpl -Psparkr -Phive -Phive-thriftserver`, build log attached: [mvn.test.log](https://github.com/apache/spark/files/3516891/mvn.test.log)
- integration tests from [DCOS Spark repo](https://github.com/mesosphere/spark-build), more specifically - [test_spark_cni.py](https://github.com/mesosphere/spark-build/blob/master/tests/test_spark_cni.py) which runs a specific [shuffle job](https://github.com/mesosphere/spark-build/blob/master/tests/jobs/scala/src/main/scala/ShuffleApp.scala) and verifies its successful completion, Mesos task network configuration, and IP addresses for both Mesos and Docker containerizers

Closes #25500 from akirillov/DCOS-45840-fix-advertised-ip-in-virtual-networks.

Authored-by: Anton Kirillov <akirillov@mesosophere.io>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-23 18:30:05 -07:00
HyukjinKwon d25cbd44ee [SPARK-28839][CORE] Avoids NPE in context cleaner when dynamic allocation and shuffle service are on
### What changes were proposed in this pull request?

This PR proposes to avoid to thrown NPE at context cleaner when shuffle service is on - it is kind of a small followup of https://github.com/apache/spark/pull/24817

Seems like it sets `null` for `shuffleIds` to track when the service is on. Later, `removeShuffle` tries to remove an element at `shuffleIds` which leads to NPE. It fixes it by explicitly not sending the event (`ShuffleCleanedEvent`) in this case.

See the code path below:

cbad616d4c/core/src/main/scala/org/apache/spark/SparkContext.scala (L584)

cbad616d4c/core/src/main/scala/org/apache/spark/ContextCleaner.scala (L125)

cbad616d4c/core/src/main/scala/org/apache/spark/ContextCleaner.scala (L190)

cbad616d4c/core/src/main/scala/org/apache/spark/ContextCleaner.scala (L220-L230)

cbad616d4c/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala (L353-L357)

cbad616d4c/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala (L347)

cbad616d4c/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala (L400-L406)

cbad616d4c/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala (L475)

cbad616d4c/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala (L427)

### Why are the changes needed?

This is a bug fix.

### Does this PR introduce any user-facing change?

It prevents the exception:

```
19/08/21 06:44:01 ERROR AsyncEventQueue: Listener ExecutorMonitor threw an exception
java.lang.NullPointerException
	at org.apache.spark.scheduler.dynalloc.ExecutorMonitor$Tracker.removeShuffle(ExecutorMonitor.scala:479)
	at org.apache.spark.scheduler.dynalloc.ExecutorMonitor.$anonfun$cleanupShuffle$2(ExecutorMonitor.scala:408)
	at org.apache.spark.scheduler.dynalloc.ExecutorMonitor.$anonfun$cleanupShuffle$2$adapted(ExecutorMonitor.scala:407)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.dynalloc.ExecutorMonitor.cleanupShuffle(ExecutorMonitor.scala:407)
	at org.apache.spark.scheduler.dynalloc.ExecutorMonitor.onOtherEvent(ExecutorMonitor.sc
```

### How was this patch test?

Unittest was added.

Closes #25551 from HyukjinKwon/SPARK-28839.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-08-23 12:44:56 -07:00
Kousuke Saruta 33e45ec7b8 [SPARK-28769][CORE] Improve warning message of BarrierExecutionMode when required slots > maximum slots
### What changes were proposed in this pull request?
Improved warning message in Barrier Execution Mode when required slots > maximum slots.
The new message contains information about required slots, maximum slots and how many times retry failed.

### Why are the changes needed?
Providing to users with the number of required slots, maximum slots and how many times retry failed might help users to decide what they should do.
For example, continuing to wait for retry succeeded or killing jobs.

### Does this PR introduce any user-facing change?
Yes.
If `spark.scheduler.barrier.maxConcurrentTaskCheck.maxFailures=3`, we get following warning message.

Before applying this change:

```
19/08/18 15:18:09 WARN DAGScheduler: The job 2 requires to run a barrier stage that requires more slots than the total number of slots in the cluster currently.
19/08/18 15:18:24 WARN DAGScheduler: The job 2 requires to run a barrier stage that requires more slots than the total number of slots in the cluster currently.
19/08/18 15:18:39 WARN DAGScheduler: The job 2 requires to run a barrier stage that requires more slots than the total number of slots in the cluster currently.
19/08/18 15:18:54 WARN DAGScheduler: The job 2 requires to run a barrier stage that requires more slots than the total number of slots in the cluster currently.
org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed: [SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires more slots than the total number of slots in the cluster currently. Please init a new cluster with more CPU cores or repartition the input RDD(s) to reduce the number of slots required to run this barrier stage.
  at org.apache.spark.scheduler.DAGScheduler.checkBarrierStageWithNumSlots(DAGScheduler.scala:439)
  at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:453)
  at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:983)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2140)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2132)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2121)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:749)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2080)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2145)
  at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:961)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:366)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:960)
  ... 47 elided
```
After applying this change:

```
19/08/18 16:52:23 WARN DAGScheduler: The job 0 requires to run a barrier stage that requires 3 slots than the total number of slots(2) in the cluster currently.
19/08/18 16:52:38 WARN DAGScheduler: The job 0 requires to run a barrier stage that requires 3 slots than the total number of slots(2) in the cluster currently (Retry 1/3 failed).
19/08/18 16:52:53 WARN DAGScheduler: The job 0 requires to run a barrier stage that requires 3 slots than the total number of slots(2) in the cluster currently (Retry 2/3 failed).
19/08/18 16:53:08 WARN DAGScheduler: The job 0 requires to run a barrier stage that requires 3 slots than the total number of slots(2) in the cluster currently (Retry 3/3 failed).
org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed: [SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires more slots than the total number of slots in the cluster currently. Please init a new cluster with more CPU cores or repartition the input RDD(s) to reduce the number of slots required to run this barrier stage.
  at org.apache.spark.scheduler.DAGScheduler.checkBarrierStageWithNumSlots(DAGScheduler.scala:439)
  at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:453)
  at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:983)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2140)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2132)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2121)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:749)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2080)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2145)
  at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:961)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:366)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:960)
  ... 47 elided
```

### How was this patch tested?
I tested manually using Spark Shell with following configuration and script. And then, checked log message.

```
$ bin/spark-shell --master local[2] --conf spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures=3
scala> sc.parallelize(1 to 100, sc.defaultParallelism+1).barrier.mapPartitions(identity(_)).collect
```

Closes #25487 from sarutak/barrier-exec-mode-warning-message.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-22 14:06:58 -05:00
Sean Owen 9ea37b09cf [SPARK-17875][CORE][BUILD] Remove dependency on Netty 3
### What changes were proposed in this pull request?

Spark uses Netty 4 directly, but also includes Netty 3 only because transitive dependencies do. The dependencies (Hadoop HDFS, Zookeeper, Avro) don't seem to need this dependency as used in Spark. I think we can forcibly remove it to slim down the dependencies.

Previous attempts were blocked by its usage in Flume, but that dependency has gone away.
https://github.com/apache/spark/pull/15436

### Why are the changes needed?

Mostly to reduce the transitive dependency size and complexity a little bit and avoid triggering spurious security alerts on Netty 3.x usage.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Existing tests

Closes #25544 from srowen/SPARK-17875.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-21 21:27:56 -07:00
WeichenXu 9779a82ea0 [SPARK-28483][CORE][FOLLOW-UP] Dealing with interrupted exception in BarrierTaskContext.barrier()
<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->
Dealing with interrupted exception in BarrierTaskContext.barrier()

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Interrupted exception will happen in the case sparkContext local property "spark.job.interruptOnCancel" set true.

### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->
No.

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->
UT.

Closes #25519 from WeichenXu123/barrier_fl.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-21 19:51:45 +08:00
Dhruve Ashar a50959a7f6 [SPARK-27937][CORE] Revert partial logic for auto namespace discovery
## What changes were proposed in this pull request?
This change reverts the logic which was introduced as a part of SPARK-24149 and a subsequent followup PR.

With existing logic:
- Spark fails to launch with HDFS federation enabled while trying to get a path to a logical nameservice.
- It gets tokens for unrelated namespaces if they are used in HDFS Federation
- Automatic namespace discovery is supported only if these are on the same cluster.

Rationale for change:
- For accessing data from related namespaces, viewfs should handle getting tokens for spark
- For accessing data from unrelated namespaces(user explicitly specifies them using existing configs) as these could be on the same or different cluster.

(Please fill in changes proposed in this fix)
Revert the changes.

## How was this patch tested?
Ran few manual tests and unit test.

Closes #24785 from dhruve/bug/SPARK-27937.

Authored-by: Dhruve Ashar <dhruveashar@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-08-20 12:42:35 -07:00
WeichenXu bc75ed675b [SPARK-28483][CORE] Fix canceling a spark job using barrier mode but barrier tasks blocking on BarrierTaskContext.barrier()
## What changes were proposed in this pull request?

Fix canceling a spark job using barrier mode but barrier tasks do not exit.
Currently, when spark tasks are killed, `BarrierTaskContext.barrier()` cannot be killed (it will blocking on RPC request), cause the task blocking and cannot exit.

In my PR I implement an interface for RPC which support `abort` in class `RpcEndpointRef`
```
  def askAbortable[T: ClassTag](
      message: Any,
      timeout: RpcTimeout): AbortableRpcFuture[T]
```

The returned `AbortableRpcFuture` instance include an `abort` method so that we can abort the RPC before it timeout.

## How was this patch tested?

Unit test added.

Manually test:

### Test code
launch spark-shell via `spark-shell --master local[4]`
and run following code:
```
sc.setLogLevel("INFO")
import org.apache.spark.BarrierTaskContext
val n = 4
def taskf(iter: Iterator[Int]): Iterator[Int] = {
  val context = BarrierTaskContext.get()
  val x = iter.next()
  if (x % 2 == 0) {
    // sleep 6000000 seconds with task killed checking
    for (i <- 0 until 6000000) {
      Thread.sleep(1000)
      if (context.isInterrupted()) {
        throw new org.apache.spark.TaskKilledException()
      }
    }
  }
  context.barrier()
  return Iterator.empty
}

// launch spark job, including 4 tasks, tasks 1/3 will enter `barrier()`, and tasks 0/2 will enter `sleep`
sc.parallelize((0 to n), n).barrier().mapPartitions(taskf).collect()
```
And then press Ctrl+C to exit the running job.

### Before
press Ctrl+C to exit the running job, then open spark UI we can see 2 tasks (task 1/3) are not killed. They are blocking.

### After
press Ctrl+C to exit the running job,  we can see in spark UI all tasks killed successfully.

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #25235 from WeichenXu123/sc_14848.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-20 14:21:47 +08:00