Commit graph

24433 commits

Author SHA1 Message Date
HyukjinKwon 8b18ef5c7b [MINOR] Avoid hardcoded py4j-0.10.8.1-src.zip in Scala
## What changes were proposed in this pull request?

This PR targets to deduplicate hardcoded `py4j-0.10.8.1-src.zip` in order to make py4j upgrade easier.

## How was this patch tested?

N/A

Closes #24770 from HyukjinKwon/minor-py4j-dedup.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-02 21:23:17 -07:00
Dongjoon Hyun 809821a283 [SPARK-27920][SQL][TEST] Add interceptParseException test utility function
## What changes were proposed in this pull request?

This PR aims to add `interceptParseException` test utility function to `AnalysisTest` to reduce the duplications of `intercept` functions.

## How was this patch tested?

Pass the Jenkins with the updated test suites.

Closes #24769 from dongjoon-hyun/SPARK-27920.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-02 21:11:35 -07:00
Yuming Wang d53b61c311 [SPARK-27831][SQL][TEST] Move Hive test jars to maven dependency
## What changes were proposed in this pull request?

This pr moves Hive test jars(`hive-contrib-0.13.1.jar`, `hive-hcatalog-core-0.13.1.jar`, `hive-contrib-2.3.5.jar` and `hive-hcatalog-core-2.3.5.jar`) to maven dependency.

## How was this patch tested?

Existing test

Please note that this pr need test with `maven` and `sbt`.

Closes #24751 from wangyum/SPARK-27831.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-02 20:23:08 -07:00
Liang-Chi Hsieh 2a88fffacb [SPARK-27873][SQL] columnNameOfCorruptRecord should not be checked with column names in CSV header when disabling enforceSchema
## What changes were proposed in this pull request?

If we want to keep corrupt record when reading CSV, we provide a new column into the schema, that is `columnNameOfCorruptRecord`. But this new column isn't actually a column in CSV header. So if `enforceSchema` is disabled, `CSVHeaderChecker` throws a exception complaining that number of column in CSV header isn't equal to that in the schema.

## How was this patch tested?

Added test.

Closes #24757 from viirya/SPARK-27873.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-03 11:09:26 +09:00
HyukjinKwon f5317f10b2 [SPARK-27893][SQL][PYTHON] Create an integrated test base for Python, Scalar Pandas, Scala UDF by sql files
## What changes were proposed in this pull request?

This PR targets to add an integrated test base for various UDF test cases so that Scalar UDF, Python UDF and Scalar Pandas UDFs can be tested in SBT & Maven tests.

### Problem

One of the problems we face is that: `ExtractPythonUDFs` (for Python UDF and Scalar Pandas UDF) has unevaluable expressions that always has to be wrapped with special plans. This special rule seems producing many issues, for instance, SPARK-27803, SPARK-26147, SPARK-26864, SPARK-26293, SPARK-25314 and SPARK-24721.

### Why do we have less test cases dedicated for SQL and plans with Python UDFs?

We have virtually no such SQL (or plan) dedicated tests in PySpark to catch such issues because:
  - A developer should know all the analyzer, the optimizer, SQL, PySpark, Py4J and version differences in Python to write such good test cases
  - To test plans, we should access to plans in JVM via Py4J which is tricky, messy and duplicates Scala test cases
  - Usually we just add end-to-end test cases in PySpark therefore there are not so many dedicated examples to refer to write in PySpark

It is also a non-trivial overhead to switch test base and method (IMHO).

### How does this PR fix?

This PR adds Python UDF and Scalar Pandas UDF into our `*.sql` file based test base in runtime of SBT / Maven test cases. It generates Python-pickled instance (consisting of return type and Python native function) that is used in Python or Scalar Pandas UDF and directly brings into JVM.

After that, (we don't interact via Py4J) run the tests directly in JVM - we can just register and run Python UDF and Scalar Pandas UDF in JVM.

Currently, I only integrated this change into SQL file based testing. This is how works with test files under `udf` directory:

After the test files under 'inputs/udf' directory are detected, it creates three test cases:
  - Scala UDF test case with a Scalar UDF registered named 'udf'.
  - Python UDF test case with a Python UDF registered named 'udf' iff Python executable and pyspark are available.
  - Scalar Pandas UDF test case with a Scalar Pandas UDF registered named 'udf' iff Python executable, pandas, pyspark and pyarrow are available.

Therefore, UDF test cases should have single input and output files but executed by three different types of UDFs.

For instance,

```sql
CREATE TEMPORARY VIEW ta AS
SELECT udf(a) AS a, udf('a') AS tag FROM t1
UNION ALL
SELECT udf(a) AS a, udf('b') AS tag FROM t2;

CREATE TEMPORARY VIEW tb AS
SELECT udf(a) AS a, udf('a') AS tag FROM t3
UNION ALL
SELECT udf(a) AS a, udf('b') AS tag FROM t4;

SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag;
```

will be ran 3 times with Scalar UDF, Python UDF and Scalar Pandas UDF each.

### Appendix

Plus, this PR adds `IntegratedUDFTestUtils` which enables to test and execute Python UDF and Scalar Pandas UDFs as below:

To register Python UDF in SQL:

```scala
IntegratedUDFTestUtils.registerTestUDF(TestPythonUDF(name = "udf"), spark)
```

To register Scalar Pandas UDF in SQL:

```scala
IntegratedUDFTestUtils.registerTestUDF(TestScalarPandasUDF(name = "udf"), spark)
```

 To use it in Scala API:

```scala
spark.select(expr("udf(1)").show()
```

 To use it in SQL:

```scala
sql("SELECT udf(1)").show()
```

This util could be used in the future for better coverage with Scala API combinations as well.

## How was this patch tested?

Tested via the command below:

```bash
build/sbt "sql/test-only *SQLQueryTestSuite -- -z udf/udf-inner-join.sql"
```

```
[info] SQLQueryTestSuite:
[info] - udf/udf-inner-join.sql - Scala UDF (5 seconds, 47 milliseconds)
[info] - udf/udf-inner-join.sql - Python UDF (4 seconds, 335 milliseconds)
[info] - udf/udf-inner-join.sql - Scalar Pandas UDF (5 seconds, 423 milliseconds)
```

[python] unavailable:

```
[info] SQLQueryTestSuite:
[info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 577 milliseconds)
[info] - udf/udf-inner-join.sql - Python UDF is skipped because [pyton] and/or pyspark were not available. !!! IGNORED !!!
[info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because pyspark,pandas and/or pyarrow were not available in [pyton]. !!! IGNORED !!!
```

pyspark unavailable:

```
[info] SQLQueryTestSuite:
[info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 991 milliseconds)
[info] - udf/udf-inner-join.sql - Python UDF is skipped because [python] and/or pyspark were not available. !!! IGNORED !!!
[info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because pyspark,pandas and/or pyarrow were not available in [python]. !!! IGNORED !!!
```

pandas and/or pyarrow unavailable:

```
[info] SQLQueryTestSuite:
[info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 713 milliseconds)
[info] - udf/udf-inner-join.sql - Python UDF (3 seconds, 89 milliseconds)
[info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because pandas and/or pyarrow were not available in [python]. !!! IGNORED !!!
```

Closes #24752 from HyukjinKwon/udf-tests.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-03 10:03:36 +09:00
HyukjinKwon db48da87f0 [SPARK-27834][SQL][R][PYTHON] Make separate PySpark/SparkR vectorization configurations
## What changes were proposed in this pull request?

`spark.sql.execution.arrow.enabled` was added when we add PySpark arrow optimization.
Later, in the current master, SparkR arrow optimization was added and it's controlled by the same configuration `spark.sql.execution.arrow.enabled`.

There look two issues about this:

1. `spark.sql.execution.arrow.enabled` in PySpark was added from 2.3.0 whereas SparkR optimization was added 3.0.0. The stability is different so it's problematic when we change the default value for one of both optimization first.

2. Suppose users want to share some JVM by PySpark and SparkR. They are currently forced to use the optimization for all or none if the configuration is set globally.

This PR proposes two separate configuration groups for PySpark and SparkR about Arrow optimization:

- Deprecate `spark.sql.execution.arrow.enabled`
- Add `spark.sql.execution.arrow.pyspark.enabled` (fallback to `spark.sql.execution.arrow.enabled`)
- Add `spark.sql.execution.arrow.sparkr.enabled`
- Deprecate `spark.sql.execution.arrow.fallback.enabled`
- Add `spark.sql.execution.arrow.pyspark.fallback.enabled ` (fallback to `spark.sql.execution.arrow.fallback.enabled`)

Note that `spark.sql.execution.arrow.maxRecordsPerBatch` is used within JVM side for both.
Note that `spark.sql.execution.arrow.fallback.enabled` was added due to behaviour change. We don't need it in SparkR - SparkR side has the automatic fallback.

## How was this patch tested?

Manually tested and some unittests were added.

Closes #24700 from HyukjinKwon/separate-sparkr-arrow.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-03 10:01:37 +09:00
Ajith 3806887afb [SPARK-27907][SQL] HiveUDAF should return NULL in case of 0 rows
## What changes were proposed in this pull request?

When query returns zero rows, the HiveUDAFFunction throws NPE

## CASE 1:
create table abc(a int)
select histogram_numeric(a,2) from abc // NPE
```
Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost, executor driver): java.lang.NullPointerException
	at org.apache.spark.sql.hive.HiveUDAFFunction.eval(hiveUDFs.scala:471)
	at org.apache.spark.sql.hive.HiveUDAFFunction.eval(hiveUDFs.scala:315)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.eval(interfaces.scala:543)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$5(AggregationIterator.scala:231)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:97)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:132)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:107)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:839)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:839)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:122)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:425)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1350)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:428)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
```

## CASE 2:
create table abc(a int)
insert into abc values (1)
select histogram_numeric(a,2) from abc where a=3 // NPE

```
Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 5, localhost, executor driver): java.lang.NullPointerException
at org.apache.spark.sql.hive.HiveUDAFFunction.serialize(hiveUDFs.scala:477)
at org.apache.spark.sql.hive.HiveUDAFFunction.serialize(hiveUDFs.scala:315)
at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:570)
at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$6(AggregationIterator.scala:254)
at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:97)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:132)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:107)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:839)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:839)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:94)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:122)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:425)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1350)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:428)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```

Hence add a check not avoid NPE

## How was this patch tested?

Added new UT case

Closes #24762 from ajithme/hiveudaf.

Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-02 10:54:21 -07:00
zhengruifeng 560e7bec6f [SPARK-27847][ML] One-Pass MultilabelMetrics & MulticlassMetrics
## What changes were proposed in this pull request?
compute all metrics with only one pass

## How was this patch tested?
existing tests

Closes #24717 from zhengruifeng/multi_label_opt.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-06-01 08:32:52 -05:00
gengjiaan 8feb80ad86 [SPARK-27811][CORE][DOCS] Improve docs about spark.driver.memoryOverhead and spark.executor.memoryOverhead.
## What changes were proposed in this pull request?

I found the docs of `spark.driver.memoryOverhead` and `spark.executor.memoryOverhead` exists a little ambiguity.
For example, the origin docs of `spark.driver.memoryOverhead` start with `The amount of off-heap memory to be allocated per driver in cluster mode`.
But `MemoryManager` also managed a memory area named off-heap used to allocate memory in tungsten mode.
So I think the description of `spark.driver.memoryOverhead` always make confused.

`spark.executor.memoryOverhead` has the same confused with `spark.driver.memoryOverhead`.

## How was this patch tested?

Exists UT.

Closes #24671 from beliefer/improve-docs-of-overhead.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-06-01 08:19:50 -05:00
Sean Owen aec0869fb2 [SPARK-27896][ML] Fix definition of clustering silhouette coefficient for 1-element clusters
## What changes were proposed in this pull request?

Single-point clusters should have silhouette score of 0, according to the original paper and scikit implementation.

## How was this patch tested?

Existing test suite + new test case.

Closes #24756 from srowen/SPARK-27896.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-31 16:27:20 -07:00
Thomas Graves 1277f8fa92 [SPARK-27362][K8S] Resource Scheduling support for k8s
## What changes were proposed in this pull request?

Add ability to map the spark resource configs spark.{executor/driver}.resource.{resourceName} to kubernetes Container builder so that we request resources (gpu,s/fpgas/etc) from kubernetes.
Note that the spark configs will overwrite any resource configs users put into a pod template.
I added a generic vendor config which is only used by kubernetes right now.  I intentionally didn't put it into the kubernetes config namespace just to avoid adding more config prefixes.

I will add more documentation for this under jira SPARK-27492. I think it will be easier to do all at once to get cohesive story.

## How was this patch tested?

Unit tests and manually testing on k8s cluster.

Closes #24703 from tgravescs/SPARK-27362.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
2019-05-31 15:26:14 -05:00
gatorsmile 2e84181ec3 [SPARK-27773][FOLLOW-UP] Fix Checkstyle failure
## What changes were proposed in this pull request?

https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-lint/

```
Checkstyle checks failed at following occurrences:
[ERROR] src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java:[99] (sizes) LineLength: Line is longer than 100 characters (found 104).
[ERROR] src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java:[101] (sizes) LineLength: Line is longer than 100 characters (found 101).
[ERROR] src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java:[103] (sizes) LineLength: Line is longer than 100 characters (found 102).
[ERROR] src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java:[105] (sizes) LineLength: Line is longer than 100 characters (found 103).
```

## How was this patch tested?
N/A

Closes #24760 from gatorsmile/updateYarnShuffleServiceMetrics.

Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-31 09:30:17 -07:00
Thomas Graves 65bd338c62 [SPARK-27897][EXAMPLES] Move the get Gpu resources script to a scripts directory
## What changes were proposed in this pull request?

move the script to a scripts directory based on discussion on https://github.com/apache/spark/pull/24731

## How was this patch tested?

ran script

Closes #24754 from tgravescs/SPARK-27897.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-31 08:04:43 -07:00
Izek Greenfield c647f9011c [SPARK-27862][BUILD] Move to json4s 3.6.6
## What changes were proposed in this pull request?
Move to json4s version 3.6.6
Add scala-xml 1.2.0

## How was this patch tested?

Pass the Jenkins

Closes #24736 from igreenfield/master.

Authored-by: Izek Greenfield <igreenfield@axiomsl.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-05-30 19:42:56 -05:00
Marco Gaido 93db7b870d [SPARK-27684][SQL] Avoid conversion overhead for primitive types
## What changes were proposed in this pull request?

As outlined in the JIRA by JoshRosen, our conversion mechanism from catalyst types to scala ones is pretty inefficient for primitive data types. Indeed, in these cases, most of the times we are adding useless calls to `identity` function or anyway to functions which return the same value. Using the information we have when we generate the code, we can avoid most of these overheads.

## How was this patch tested?

Here is a simple test which shows the benefit that this PR can bring:
```
test("SPARK-27684: perf evaluation") {
    val intLongUdf = ScalaUDF(
      (a: Int, b: Long) => a + b, LongType,
      Literal(1) :: Literal(1L) :: Nil,
      true :: true :: Nil,
      nullable = false)

    val plan = generateProject(
      MutableProjection.create(Alias(intLongUdf, s"udf")() :: Nil),
      intLongUdf)
    plan.initialize(0)

    var i = 0
    val N = 100000000
    val t0 = System.nanoTime()
    while(i < N) {
      plan(EmptyRow).get(0, intLongUdf.dataType)
      plan(EmptyRow).get(0, intLongUdf.dataType)
      plan(EmptyRow).get(0, intLongUdf.dataType)
      plan(EmptyRow).get(0, intLongUdf.dataType)
      plan(EmptyRow).get(0, intLongUdf.dataType)
      plan(EmptyRow).get(0, intLongUdf.dataType)
      plan(EmptyRow).get(0, intLongUdf.dataType)
      plan(EmptyRow).get(0, intLongUdf.dataType)
      plan(EmptyRow).get(0, intLongUdf.dataType)
      plan(EmptyRow).get(0, intLongUdf.dataType)
      i += 1
    }
    val t1 = System.nanoTime()
    println(s"Avg time: ${(t1 - t0).toDouble / N} ns")
  }
```
The output before the patch is:
```
Avg time: 51.27083294 ns
```
after, we get:
```
Avg time: 11.85874227 ns
```
which is ~5X faster.

Moreover a benchmark has been added for Scala UDF. The output after the patch can be seen in this PR, before the patch, the output was:
```
================================================================================================
UDF with mixed input types
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4558U CPU  2.80GHz
long/nullable int/string to string:       Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
long/nullable int/string to string wholestage off            257            287          42          0,4        2569,5       1,0X
long/nullable int/string to string wholestage on            158            172          18          0,6        1579,0       1,6X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4558U CPU  2.80GHz
long/nullable int/string to option:       Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
long/nullable int/string to option wholestage off            104            107           5          1,0        1037,9       1,0X
long/nullable int/string to option wholestage on             80             92          12          1,2         804,0       1,3X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4558U CPU  2.80GHz
long/nullable int to primitive:           Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
long/nullable int to primitive wholestage off             71             76           7          1,4         712,1       1,0X
long/nullable int to primitive wholestage on             64             71           6          1,6         636,2       1,1X

================================================================================================
UDF with primitive types
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4558U CPU  2.80GHz
long/nullable int to string:              Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
long/nullable int to string wholestage off             60             60           0          1,7         600,3       1,0X
long/nullable int to string wholestage on             55             64           8          1,8         551,2       1,1X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4558U CPU  2.80GHz
long/nullable int to option:              Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
long/nullable int to option wholestage off             66             73           9          1,5         663,0       1,0X
long/nullable int to option wholestage on             30             32           2          3,3         300,7       2,2X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4558U CPU  2.80GHz
long/nullable int/string to primitive:    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
long/nullable int/string to primitive wholestage off             32             35           5          3,2         316,7       1,0X
long/nullable int/string to primitive wholestage on             41             68          17          2,4         414,0       0,8X
```
The improvements are particularly visible in the second case, ie. when only primitive types are used as inputs.

Closes #24636 from mgaido91/SPARK-27684.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Josh Rosen <rosenville@gmail.com>
2019-05-30 17:09:19 -07:00
Steven Rand 568512cc82 [SPARK-27773][SHUFFLE] add metrics for number of exceptions caught in ExternalShuffleBlockHandler
## What changes were proposed in this pull request?

Add a metric for number of exceptions caught in the `ExternalShuffleBlockHandler`, the idea being that spikes in this metric over some time window (or more desirably, the lack thereof) can be used as an indicator of the health of an external shuffle service. (Where "health" refers to its ability to successfully respond to client requests.)

## How was this patch tested?

Deployed a build of this PR to a YARN cluster, and confirmed that the NodeManagers' JMX metrics include `numCaughtExceptions`.

Closes #24645 from sjrand/SPARK-27773.

Authored-by: Steven Rand <srand@palantir.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-05-30 13:57:15 -07:00
Gengliang Wang 49e7387741 [SPARK-27849][SQL][FOLLOWUP][TEST-MAVEN] Fix the testing regex in DataSourceScanRedactionTest
## What changes were proposed in this pull request?

As explained in https://github.com/apache/spark/pull/24719#pullrequestreview-243064785, the regex `file:/[\\w-_/]+` contains possible characters I have met in the Jenkins tests.
However, we still miss the  `.` symbol:
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/6415/testReport/junit/org.apache.spark.sql.execution/DataSourceV2ScanExecRedactionSuite/treeString_is_redacted/ :
```
orc *********(redacted).7/sql/core/target/tmp/spark-7ff5f81d-069a-4b5d-9d9a-808addeef115
```

This PR is to fix it by matching any character except `]` or spaces.
## How was this patch tested?

Unit test

Closes #24745 from gengliangwang/fixRegex.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-30 13:45:12 -07:00
Thomas Graves 0ced4c0b13 [SPARK-27378][YARN] YARN support for GPU-aware scheduling
## What changes were proposed in this pull request?

Add yarn support for GPU-aware scheduling. Since SPARK-20327 already added yarn custom resource support, this jira is really just making sure the spark resource configs get mapped into the yarn resource configs and user doesn't specify both yarn and spark config for the known types of resources (gpu and fpga are the known types on yarn).

You can find more details on the design and requirements documented: https://issues.apache.org/jira/browse/SPARK-27376

Note that the running on yarn docs already state to use it, it must be yarn 3.0+. We will add any further documentation under SPARK-20327

## How was this patch tested?

Unit tests and manually testing on yarn cluster

Closes #24634 from tgravescs/SPARK-27361.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-05-30 13:23:46 -07:00
Dongjoon Hyun 955eef95b3 Revert "[SPARK-27831][SQL][TEST][test-hadoop3.2] Move Hive test jars to maven dependency"
This reverts commit 24180c00e0.
2019-05-30 10:06:55 -07:00
John Zhuge a44b00dfe0 [SPARK-27813][SQL] DataSourceV2: Add DropTable logical operation
## What changes were proposed in this pull request?

Support DROP TABLE from V2 catalogs.
Move DROP TABLE into catalyst.
Move parsing tests for DROP TABLE/VIEW to PlanResolutionSuite to validate existing behavior.
Add new tests fo catalyst parser suite.
Separate DROP VIEW into different code path from DROP TABLE.
Move DROP VIEW into catalyst as a new operator.
Add a meaningful exception to indicate view is not currently supported in v2 catalog.

## How was this patch tested?

New unit tests.
Existing unit tests in catalyst and sql core.

Closes #24686 from jzhuge/SPARK-27813-pr.

Authored-by: John Zhuge <jzhuge@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-05-31 00:56:07 +08:00
Fokko Driesprong bd87323003 [SPARK-27757][CORE] Bump Jackson to 2.9.9
## What changes were proposed in this pull request?

This fixes CVE-2019-12086 on Databind: https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.9.9

## How was this patch tested?

Existing tests

Closes #24646 from Fokko/SPARK-27757.

Authored-by: Fokko Driesprong <fokko@apache.org>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-05-30 09:35:20 -05:00
Thomas Graves 6748b486a9 [SPARK-27835][CORE] Resource Scheduling: change driver config from addresses
## What changes were proposed in this pull request?

Change the Driver resource discovery argument for standalone mode to be a file rather then separate address configs per resource. This makes it consistent with how the Executor is doing it and makes it more flexible in the future, and it makes for less configs if you have multiple resources.

## How was this patch tested?

Unit tests and basic manually testing to make sure files were parsed properly.

Closes #24730 from tgravescs/SPARK-27835-driver-resourcesFile.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
2019-05-30 07:51:06 -05:00
Yuming Wang db3e746b64 [SPARK-27875][CORE][SQL][ML][K8S] Wrap all PrintWriter with Utils.tryWithResource
## What changes were proposed in this pull request?

This pr wrap all `PrintWriter` with `Utils.tryWithResource` to prevent resource leak.

## How was this patch tested?

Existing test

Closes #24739 from wangyum/SPARK-27875.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-05-30 19:54:32 +09:00
Thomas Graves d0a5aea12e [SPARK-27725][EXAMPLES] Add an example discovery Script for GPU resources
## What changes were proposed in this pull request?

Example GPU resource discovery script that can be used with Nvidia GPUs and passed into SPARK via spark.{driver/executor}.resource.gpu.discoveryScript

For example:
./bin/spark-shell --master yarn --deploy-mode client --driver-memory 1g  --conf spark.yarn.am.memory=3g --num-executors 1 --executor-memory 1g  --conf spark.driver.resource.gpu.count=2  --executor-cores 1 --conf spark.driver.resource.gpu.discoveryScript=/home/tgraves/workspace/tgravescs-spark/examples/src/main/resources/getGpusResources.sh  --conf spark.executor.resource.gpu.count=1 --conf spark.task.resource.gpu.count=1 --conf spark.executor.resource.gpu.discoveryScript=/home/tgraves/workspace/tgravescs-spark/examples/src/main/resources/getGpusResources.sh

## How was this patch tested?

Manually tested local cluster mode and yarn mode. Tested on a node with 8 GPUs and one with 2 GPUs.

Closes #24731 from tgravescs/SPARK-27725.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-29 22:36:44 -07:00
John Zhuge 953b8e8206 [SPARK-26946][SQL][FOLLOWUP] Require lookup function
## What changes were proposed in this pull request?

Require the lookup function with interface LookupCatalog. Rationale is in the review comments below.

Make `Analyzer` abstract. BaseSessionStateBuilder and HiveSessionStateBuilder implements lookupCatalog with a call to SparkSession.catalog().

Existing test cases and those that don't need catalog lookup will use a newly added `TestAnalyzer` with a default lookup function that throws` CatalogNotFoundException("No catalog lookup function")`.

Rewrote the unit test for LookupCatalog to demonstrate the interface can be used anywhere, not just Analyzer.

Removed Analyzer parameter `lookupCatalog` because we can override in the following manner:
```
new Analyzer() {
  override def lookupCatalog(name: String): CatalogPlugin = ???
}
```

## How was this patch tested?

Existing unit tests.

Closes #24689 from jzhuge/SPARK-26946-follow.

Authored-by: John Zhuge <jzhuge@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-05-30 09:22:42 +08:00
ShuMingLi 7d2fec31e7 [MINOR][DOC] Fix spelling mistake
## What changes were proposed in this pull request?

`A very little spelling mistake.`

## How was this patch tested?
No

Closes #24710 from LiShuMing/minor-spelling.

Authored-by: ShuMingLi <ming.moriarty@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-05-29 18:21:47 -05:00
Marcelo Vanzin 09ed64d795 [SPARK-27868][CORE] Better default value and documentation for socket server backlog.
First, there is currently no public documentation for this setting. So it's hard
to even know that it could be a problem if your application starts failing with
weird shuffle errors.

Second, the javadoc attached to the code was incorrect; the default value just uses
the default value from the JRE, which is 50, instead of having an unbounded queue
as the comment implies.

So use a default that is a "rounded" version of the JRE default, and provide
documentation explaining that this value may need to be adjusted. Also added
a log message that was very helpful in debugging an issue caused by this
problem.

Closes #24732 from vanzin/SPARK-27868.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-29 14:56:36 -07:00
Gengliang Wang c1007c2f7c [SPARK-27849][SQL] Redact treeString of FileTable and DataSourceV2ScanExecBase
## What changes were proposed in this pull request?

To follow https://github.com/apache/spark/pull/17397, the output of FileTable and DataSourceV2ScanExecBase can contain sensitive information (like Amazon keys). Such information should not end up in logs, or be exposed to non-privileged users.

This PR is to add a redaction facility for these outputs to resolve the issue. A user can enable this by setting a regex in the same spark.redaction.string.regex configuration as V1.
## How was this patch tested?

Unit test

Closes #24719 from gengliangwang/RedactionSuite.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-29 13:32:21 -07:00
Aaruna bfa7f112e3 [SPARK-27869][CORE] Redact sensitive information in System Properties from UI
## What changes were proposed in this pull request?

Currently system properties are not redacted. This PR fixes that, so that any credentials passed as System properties are redacted as well.

## How was this patch tested?

Manual test. Run the following and see the UI.
```
bin/spark-shell --conf 'spark.driver.extraJavaOptions=-DMYSECRET=app'
```

Closes #24733 from aaruna/27869.

Authored-by: Aaruna <aaruna.godthi@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-29 10:30:16 -07:00
Josh Rosen 19aaf0f784 [SPARK-27829][SQL] In Dataset.joinWith() inner joins, don't nest data before shuffling
## What changes were proposed in this pull request?

In order to support outer joins with null top-level objects, SPARK-15441 modified Dataset.joinWith to project both inputs into single-column structs prior to the join.

For inner joins, however, this step is unnecessary and actually harms performance: performing the nesting before the join increases the shuffled data size. As an optimization for inner joins only, we can move this nesting to occur after the join (effectively switching back to the pre-SPARK-15441 behavior; see #13425).

## How was this patch tested?

Existing tests, which I strengthened to also make assertions about the join result's nullability (since this guards against a bug I almost introduced during prototyping).

Here's a quick `spark-shell` experiment demonstrating the reduction in shuffle size:

```scala
// With --conf spark.shuffle.compress=false
sql("set spark.sql.autoBroadcastJoinThreshold=-1") // for easier shuffle measurements
case class Foo(a: Long, b: Long)
val left = spark.range(10000).map(x => Foo(x, x))
val right = spark.range(10000).map(x => Foo(x, x))
left.joinWith(right, left("a") === right("a"), "inner").rdd.count()
left.joinWith(right, left("a") === right("a"), "left").rdd.count()
```

With inner join (which benefits from this PR's optimization) we shuffle 546.9 KiB. With left outer join (whose plan hasn't changed, therefore being a representation of the state before this PR) we shuffle 859.4 KiB. Shuffle compression (which is enabled by default) narrows this gap a bit: with compression, outer joins shuffle about 12% more than inner joins.

Closes #24693 from JoshRosen/fast-join-with-for-inner-joins.

Lead-authored-by: Josh Rosen <rosenville@gmail.com>
Co-authored-by: Josh Rosen <joshrosen@stripe.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-05-29 16:12:24 +08:00
Yuming Wang 67582fdbfe [SPARK-27737][SQL][FOLLOW-UP] Move sql/hive-thriftserver/v2.3.4 to sql/hive-thriftserver/v2.3.5
## What changes were proposed in this pull request?

This pr moves `sql/hive-thriftserver/v2.3.4` to `sql/hive-thriftserver/v2.3.5` based on ([comment](https://github.com/apache/spark/pull/24628#issuecomment-496459258)).

## How was this patch tested?

N/A

Closes #24728 from wangyum/SPARK-27737-thriftserver.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-28 16:08:58 -07:00
Shixiong Zhu 04f142db9c
[SPARK-20547][REPL] Throw RemoteClassLoadedError for transient errors in ExecutorClassLoader
## What changes were proposed in this pull request?

`ExecutorClassLoader`'s `findClass` may fail to fetch a class due to transient exceptions. For example, when a task is interrupted, if `ExecutorClassLoader` is fetching a class, you may see `InterruptedException` or `IOException` wrapped by `ClassNotFoundException`, even if this class can be loaded. Then the result of `findClass` will be cached by JVM, and later when the same class is being loaded in the same executor, it will just throw NoClassDefFoundError even if the class can be loaded.

I found JVM only caches `LinkageError` and `ClassNotFoundException`. Hence in this PR, I changed ExecutorClassLoader to throw `RemoteClassLoadedError` if we cannot get a response from driver.

## How was this patch tested?

New unit tests.

Closes #24683 from zsxwing/SPARK-20547-fix.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2019-05-28 12:56:14 -07:00
Yuming Wang 4e61de4380 [SPARK-27863][SQL] Metadata files and temporary files should not be counted as data files
## What changes were proposed in this pull request?
[`DataSourceUtils.isDataPath(path)`](https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala#L95) should be `DataSourceUtils.isDataPath(status.getPath)`.

This pr fix this issue.

## How was this patch tested?

unit tests

Closes #24725 from wangyum/SPARK-27863.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-28 09:28:35 -07:00
MJ Tang 1824cbfa39 [SPARK-27657][ML] Fix the log format of ml.util.Instrumentation.logFai…
…lure

## What changes were proposed in this pull request?

The failure log format is fixed according to the jdk implementation.

## How was this patch tested?
Manual tests have been done. The new failure log format would be like:
java.lang.RuntimeException: Failed to finish the task
	at com.xxx.Test.test(Test.java:106)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
	at org.testng.internal.Invoker.invokeMethod(Invoker.java:571)
	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:707)
	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:979)
	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
	at org.testng.TestRunner.privateRun(TestRunner.java:648)
	at org.testng.TestRunner.run(TestRunner.java:505)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
	at org.testng.SuiteRunner.run(SuiteRunner.java:364)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1187)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1116)
	at org.testng.TestNG.runSuites(TestNG.java:1028)
	at org.testng.TestNG.run(TestNG.java:996)
	at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:72)
	at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:123)
Caused by: java.io.FileNotFoundException: File is not found
	at com.xxx.Test.test(Test.java:105)
	... 24 more

Closes #24684 from breakdawn/master.

Authored-by: MJ Tang <mingjtang@ebay.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-05-28 09:29:46 -05:00
gengjiaan c30b5297bc [SPARK-27776][SQL] Avoid duplicate Java reflection in DataSource.
## What changes were proposed in this pull request?

I checked the code of
`org.apache.spark.sql.execution.datasources.DataSource`
, there exists duplicate Java reflection.
`sourceSchema`,`createSource`,`createSink`,`resolveRelation`,`writeAndRead`, all the methods call the `providingClass.getConstructor().newInstance()`.
The instance of `providingClass` is stateless, such as:
`KafkaSourceProvider`
`RateSourceProvider`
`TextSocketSourceProvider`
`JdbcRelationProvider`
`ConsoleSinkProvider`

AFAIK, Java reflection will result in significant performance issue.
The oracle website [https://docs.oracle.com/javase/tutorial/reflect/index.html](https://docs.oracle.com/javase/tutorial/reflect/index.html) contains some performance description about Java reflection:

```
Performance Overhead
Because reflection involves types that are dynamically resolved, certain Java virtual machine optimizations can not be performed. Consequently, reflective operations have slower performance than their non-reflective counterparts, and should be avoided in sections of code which are called frequently in performance-sensitive applications.
```

I have found some performance cost test of Java reflection as follows:
[https://blog.frankel.ch/performance-cost-of-reflection/](https://blog.frankel.ch/performance-cost-of-reflection/) contains performance cost test.
[https://stackoverflow.com/questions/435553/java-reflection-performance](https://stackoverflow.com/questions/435553/java-reflection-performance) has a discussion of java reflection.

So I think should avoid duplicate Java reflection and reuse the instance of `providingClass`.

## How was this patch tested?

Exists UT.

Closes #24647 from beliefer/optimize-DataSource.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-05-28 09:26:06 -05:00
HyukjinKwon 90b6cda9af [SPARK-25944][R][BUILD] AppVeyor change to latest R version (3.6.0)
## What changes were proposed in this pull request?

R 3.6.0 is released 2019-04-26. This PR targets to change R version from 3.5.1 to 3.6.0 in AppVeyor.

This PR sets `R_REMOTES_NO_ERRORS_FROM_WARNINGS` to `true` to avoid the warnings below:

```
Error in strptime(xx, f, tz = tz) :
  (converted from warning) unable to identify current timezone 'C':
please set environment variable 'TZ'
Error in i.p(...) :
  (converted from warning) installation of package 'praise' had non-zero exit status
Calls: <Anonymous> ... with_rprofile_user -> with_envvar -> force -> force -> i.p
Execution halted
```

## How was this patch tested?

AppVeyor

Closes #24716 from HyukjinKwon/SPARK-27848.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-05-28 14:42:03 +09:00
wenxuanguan 35952cb42c [SPARK-27859][SS] Use efficient sorting instead of .sorted.reverse sequence
## What changes were proposed in this pull request?

descending sort in HDFSMetadataLog.getLatest instead of two action of ascending sort and reverse

## How was this patch tested?

Jenkins

Closes #24711 from wenxuanguan/bug-fix-hdfsmetadatalog.

Authored-by: wenxuanguan <choose_home@126.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-27 21:53:23 -07:00
wuyi 826ee6074e [SPARK-23191][CORE] Warn rather than terminate when duplicate worker register happens
## What changes were proposed in this pull request?

### Standalone HA Background

In Spark Standalone HA mode, we'll have multiple masters running at the same time. But, there's only one master leader, which actively serving scheduling requests. Once this master leader crashes, other masters would compete for the leader and only one master is guaranteed to be elected as new master leader, which would reconstruct the state from the original master leader and continute to serve scheduling requests.

### Related Issues

#2828 firstly introduces the bug of *duplicate Worker registration*, and #3447 fixed it. But there're still corner cases(see SPARK-23191 for details) where #3447 can not cover it:

* CASE 1
(1) Initially, Worker registered with Master A.
(2) After a while, the connection channel between Master A and Worker becomes inactive(e.g. due to network drop), and Worker is notified about that by calling `onDisconnected` from NettyRpcEnv
(3) When Worker invokes `onDisconnected`, then, it will attempt to reconnect to all masters(including Master A)
(4) At the meanwhile, network between Worker and Master A recover,  Worker successfully register to Master A again
(5) Master A response with `RegisterWorkerFailed("Duplicate worker ID")`
(6) Worker receives that msg, exit

* CASE 2
(1) Master A lost leadership(sends `RevokedLeadership` to itself). Master B takes over and recovery everything from master A(which would  register workers for the first time in Master B) and sends `MasterChanged` to Worker
(2) Before Master A receives `RevokedLeadership`, it receives a late `HeartBeat` from Worker(which had been removed in Master A due to heartbeat timeout previously), so it sends `ReconnectWorker`  to worker
(3) Worker receives `MasterChanged` before `ReconnectWorker` , changing masterRef to Master B
(4) Subsequently, Worker receives `ReconnectWorker` from Master A, then it reconnects to all masters
(5) Master B receives register request again from the Worker,  response with `RegisterWorkerFailed("Duplicate worker ID")`
(6) Worker receives that msg, exit

In CASE 1, it is difficult for the Worker to know Master A's state. Normally, Worker thinks Master A has already died and is impossible that Master A would response with Worker's re-connect request.

In CASE 2, we can see race condition between `RevokedLeadership` and `HeartBeat`. Actually, Master A has already been revoked leadership while processing `HeartBeat` msg. That's means the state between Master and Zookeeper could be out of sync for a while.

### Solutions

In this PR, instead of exiting Worker process when *duplicate Worker registration* happens, we suggest to log warn about it. This would be fine since Master actually perform no-op when it receives duplicate registration from a Worker. In turn, Worker could continue living with that Master normally without any side effect.

## How was this patch tested?

Tested Manually.

I followed the steps as  Neeraj Gupta suggested in JIRA SPARK-23191 to reproduce the case 1.

Before this pr, Worker would be DEAD from UI.
After this pr, Worker just warn the duplicate register behavior (as you can see the second last row in log snippet below), and still be ALIVE from UI.

```
19/05/09 20:58:32 ERROR Worker: Connection to master failed! Waiting for master to reconnect...
19/05/09 20:58:32 INFO Worker: wuyi.local:7077 Disassociated !
19/05/09 20:58:32 INFO Worker: Connecting to master wuyi.local:7077...
19/05/09 20:58:32 ERROR Worker: Connection to master failed! Waiting for master to reconnect...
19/05/09 20:58:32 INFO Worker: Not spawning another attempt to register with the master, since there is an attempt scheduled already.
19/05/09 20:58:37 WARN TransportClientFactory: DNS resolution for wuyi.local/127.0.0.1:7077 took 5005 ms
19/05/09 20:58:37 INFO TransportClientFactory: Found inactive connection to wuyi.local/127.0.0.1:7077, creating a new one.
19/05/09 20:58:37 INFO TransportClientFactory: Successfully created connection to wuyi.local/127.0.0.1:7077 after 3 ms (0 ms spent in bootstraps)
19/05/09 20:58:37 WARN Worker: Duplicate registration at master spark://wuyi.local:7077
19/05/09 20:58:37 INFO Worker: Successfully registered with master spark://wuyi.local:7077
```

Closes #24569 from Ngone51/fix-worker-dup-register-error.

Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-05-28 11:59:29 +08:00
Gabbi Merz 29e154b2f1 [SPARK-27858][SQL] Fix for avro deserialization on union types with multiple non-null types
## What changes were proposed in this pull request?

This PR aims to fix an issue on a union avro type with more than one non-null value (for instance `["string", "null", "int"]`) whose the deserialization to a DataFrame would throw a `java.lang.ArrayIndexOutOfBoundsException`. The issue was that the `fieldWriter` relied on the index from the avro schema before nulls were filtered out.

## How was this patch tested?

A test for the case of multiple non-null values was added and the tests were run using sbt by running `testOnly org.apache.spark.sql.avro.AvroSuite`

Closes #24722 from gcmerz/master.

Authored-by: Gabbi Merz <gmerz@palantir.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-27 20:09:23 -07:00
Tom van Bussel 00a8c85a44 [SPARK-27071][CORE] Expose additional metrics in status.api.v1.StageData
## What changes were proposed in this pull request?

This PR exposes additional metrics in `status.api.v1.StageData`. These metrics were already computed for `LiveStage`, but they were never exposed to the user. This includes extra metrics about the JVM GC, executor (de)serialization times, shuffle reads and writes, and more.

## How was this patch tested?

Existing tests.

cc hvanhovell

Closes #24011 from tomvanbussel/SPARK-27071.

Authored-by: Tom van Bussel <tom.vanbussel@databricks.com>
Signed-off-by: herman <herman@databricks.com>
2019-05-27 17:36:22 +02:00
zhengruifeng 32461d4744 [SPARK-27777][ML] Eliminate uncessary sliding job in AreaUnderCurve
## What changes were proposed in this pull request?
compute AUC on one pass

## How was this patch tested?
existing tests

performance tests:
```
import org.apache.spark.mllib.evaluation._
val scoreAndLabels = sc.parallelize(Array.range(0, 100000).map{ i => (i.toDouble / 100000, (i % 2).toDouble) }, 4)
scoreAndLabels.persist()
scoreAndLabels.count()
val tic = System.currentTimeMillis
(0 until 100).foreach{i => val metrics = new BinaryClassificationMetrics(scoreAndLabels, 0); val auc = metrics.areaUnderROC; metrics.unpersist}
val toc = System.currentTimeMillis
toc - tic
```

|New| Existing|
|------|----------|
|87532|103644|

One-pass AUC saves about 16% computation time.

Closes #24648 from zhengruifeng/auc_opt.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-05-27 10:31:07 -05:00
Yuanjian Li 8949bc7a3c [SPARK-27665][CORE] Split fetch shuffle blocks protocol from OpenBlocks
## What changes were proposed in this pull request?

As the current approach in OneForOneBlockFetcher, we reuse the OpenBlocks protocol to describe the fetch request for shuffle blocks, and it causes the extension work for shuffle fetching like #19788 and #24110 very awkward.
In this PR, we split the fetch request for shuffle blocks from OpenBlocks which named FetchShuffleBlocks. It's a loose bind with ShuffleBlockId and can easily extend by adding new fields in this protocol.

## How was this patch tested?

Existing and new added UT.

Closes #24565 from xuanyuanking/SPARK-27665.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-05-27 22:19:31 +08:00
Wenchen Fan 6506616b97 [SPARK-27803][SQL][PYTHON] Fix column pruning for Python UDF
## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/22104 , we create the python-eval nodes at the end of the optimization phase, which causes a problem.

After the main optimization batch, Filter and Project nodes are usually pushed to the bottom, near the scan node. However, if we extract Python UDFs from Filter/Project, and create a python-eval node under Filter/Project, it will break column pruning/filter pushdown of the scan node.

There are some hacks in the `ExtractPythonUDFs` rule, to duplicate the column pruning and filter pushdown logic. However, it has some bugs as demonstrated in the new test case(only column pruning is broken). This PR removes the hacks and re-apply the column pruning and filter pushdown rules explicitly.

**Before:**

```
...
== Analyzed Logical Plan ==
a: bigint
Project [a#168L]
+- Filter dummyUDF(a#168L)
   +- Relation[a#168L,b#169L] parquet

== Optimized Logical Plan ==
Project [a#168L]
+- Project [a#168L, b#169L]
   +- Filter pythonUDF0#174: boolean
      +- BatchEvalPython [dummyUDF(a#168L)], [a#168L, b#169L, pythonUDF0#174]
         +- Relation[a#168L,b#169L] parquet

== Physical Plan ==
*(2) Project [a#168L]
+- *(2) Project [a#168L, b#169L]
   +- *(2) Filter pythonUDF0#174: boolean
      +- BatchEvalPython [dummyUDF(a#168L)], [a#168L, b#169L, pythonUDF0#174]
         +- *(1) FileScan parquet [a#168L,b#169L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/_1/bzcp960d0hlb988k90654z2w0000gp/T/spark-798bae3c-a2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
```

**After:**

```
...
== Analyzed Logical Plan ==
a: bigint
Project [a#168L]
+- Filter dummyUDF(a#168L)
   +- Relation[a#168L,b#169L] parquet

== Optimized Logical Plan ==
Project [a#168L]
+- Filter pythonUDF0#174: boolean
   +- BatchEvalPython [dummyUDF(a#168L)], [pythonUDF0#174]
      +- Project [a#168L]
         +- Relation[a#168L,b#169L] parquet

== Physical Plan ==
*(2) Project [a#168L]
+- *(2) Filter pythonUDF0#174: boolean
   +- BatchEvalPython [dummyUDF(a#168L)], [pythonUDF0#174]
      +- *(1) FileScan parquet [a#168L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/_1/bzcp960d0hlb988k90654z2w0000gp/T/spark-9500cafb-78..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint>
```

## How was this patch tested?

new test

Closes #24675 from cloud-fan/python.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-05-27 21:39:59 +09:00
Dilip Biswal 5060647bb1 [SPARK-27782][SQL] Use '#' to mark expression id embedded in the name field of SubqueryExec operator
## What changes were proposed in this pull request?
This is a minor pr to use `#` as a marker for expression id that is embedded in the name field of SubqueryExec operator.

## How was this patch tested?
Added a small test in SubquerySuite.

Closes #24652 from dilipbiswal/subquery-name.

Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-26 20:47:25 -07:00
Yuming Wang 447bfdec83 [SPARK-27844][SQL] Avoid hard-coded config: spark.rdd.parallelListingThreshold in SQL module
## What changes were proposed in this pull request?

Avoid hard-coded config: `spark.rdd.parallelListingThreshold`.

## How was this patch tested?

N/A

Closes #24708 from wangyum/spark.rdd.parallelListingThreshold.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-26 09:00:01 -07:00
Yuming Wang 193304b51b [SPARK-27441][SQL][TEST] Add read/write tests to Hive serde tables
## What changes were proposed in this pull request?

The versions between Hive, Parquet and ORC after the built-in Hive upgraded to 2.3.5 for Hadoop 3.2:

- built-in Hive is 1.2.1.spark2:

  | ORC | Parquet
-- | -- | --
Spark datasource table | 1.5.5 | 1.10.1
Spark hive table | Hive built-in | 1.6.0
Apache Hive 1.2.1 | Hive built-in | 1.6.0

- built-in Hive is 2.3.5:

  | ORC | Parquet
-- | -- | --
Spark datasource table | 1.5.5 | 1.10.1
Spark hive table | 1.5.5 | [1.10.1](https://github.com/apache/spark/pull/24346)
Apache Hive 2.3.5 | 1.3.4 | 1.8.1

We should add a test for Hive Serde table. This pr adds tests to test read/write of all supported data types using Parquet and ORC.

## How was this patch tested?

unit tests

Closes #24345 from wangyum/SPARK-27441.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-26 08:35:58 -07:00
Yuming Wang dcacfc5da6 [SPARK-27074][SQL][test-hadoop3.2][test-maven] Hive 3.1 metastore support HiveClientImpl.runHive
## What changes were proposed in this pull request?

Hive 3.1.1's `CommandProcessor` have 2 changes:
1. [HIVE-17626](https://issues.apache.org/jira/browse/HIVE-17626)(Hive 3.0.0) add ReExecDriver. So the current code path is: 02bbe977ab/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala (L736-L742)
We can disable `hive.query.reexecution.enabled` to workaround this change.
2. [HIVE-18238](http://issues.apache.org/jira/browse/HIVE-18238)(Hive 3.0.0) changed the `Driver.close()` function return type. We can workaround it by ` driver.getClass.getMethod("close").invoke(driver)`

So Hive 3.1 metastore could support `HiveClientImpl.runHive` after this pr.

## How was this patch tested?

unit tests

Closes #23992 from wangyum/SPARK-27074.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-26 08:24:41 -07:00
zhengruifeng be9e9466e2 [SPARK-27787][ML] Eliminate uncessary job to compute SSreg
## What changes were proposed in this pull request?
Eliminate uncessary job to compute SSreg
Compute SSreg based on the summary of predictions

## How was this patch tested?
existing tests

Closes #24656 from zhengruifeng/RegressionMetrics_opt.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-05-26 08:16:32 -05:00
10087686 3df86a2581 [SPARK-27147][CORE][TEST] Add SortShuffleWriterSuite
## What changes were proposed in this pull request?
There are no unit test cases for this SortShuffleWriter,so add new test cases;

## How was this patch tested?
new test cases

Closes #24080 from wangjiaochun/UtestForSortShuffleWriter.

Authored-by: 10087686 <wang.jiaochun@zte.com.cn>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-25 21:03:10 -07:00
rrusso2007 ebd1431a5a [SPARK-27801][SQL] Improve performance of InMemoryFileIndex.listLeafFiles for HDFS directories with many files
## What changes were proposed in this pull request?

InMemoryFileIndex.listLeafFiles should use listLocatedStatus for DistributedFileSystem. DistributedFileSystem overrides the listLocatedStatus method in order to do it with 1 single namenode call thus saving thousands of calls to getBlockLocations.

Currently in InMemoryFileIndex, all directory listings are done using FileSystem.listStatus following by individual calls to FileSystem.getFileBlockLocations. This is painstakingly slow for folders that have large numbers of files because this process happens serially and parallelism is only applied at the folder level, not the file level.

FileSystem also provides another API listLocatedStatus which returns the LocatedFileStatus objects that already have the block locations. In FileSystem main class this just delegates to listStatus and getFileBlockLocations similarly to the way Spark does it. However when HDFS specifically is the backing file system, DistributedFileSystem overrides this method and simply makes one single call to the namenode to retrieve the directory listing with the block locations. This avoids potentially thousands or more calls to namenode and also is more consistent because files will either exist with locations or not exist instead of having the FileNotFoundException exception case.

For our example directory with 6500 files, the load time of spark.read.parquet was reduced 96x from 76 seconds to .8 seconds. This savings only goes up with the number of files in the directory.

In the pull request instead of using this method always which could lead to a FileNotFoundException that could be tough to decipher in the default FileSystem implementation, this method is only used when the FileSystem is a DistributedFileSystem and otherwise the old logic still applies.

## How was this patch tested?

test suite ran

Closes #24672 from rrusso2007/master.

Authored-by: rrusso2007 <rrusso2007@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-25 15:49:30 -07:00