Commit graph

24847 commits

Author SHA1 Message Date
Stavros Kontopoulos 4a2c662315 [SPARK-27921][PYTHON][SQL][TESTS][FOLLOW-UP] Add UDF cases into group by clause in 'udf-group-analytics.sql'
## What changes were proposed in this pull request?

This PR is a followup of a fix as described in here: #25215 (comment)
<details><summary>Diff comparing to 'group-analytics.sql'</summary>
<p>

```diff
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out
index 3439a05727..de297ab166 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out
 -13,9 +13,9  struct<>

 -- !query 1
-SELECT a + b, b, SUM(a - b) FROM testData GROUP BY a + b, b WITH CUBE
+SELECT udf(a + b), b, udf(SUM(a - b)) FROM testData GROUP BY udf(a + b), b WITH CUBE
 -- !query 1 schema
-struct<(a + b):int,b:int,sum((a - b)):bigint>
+struct<CAST(udf(cast((a + b) as string)) AS INT):int,b:int,CAST(udf(cast(sum(cast((a - b) as bigint)) as string)) AS BIGINT):bigint>
 -- !query 1 output
 2	1	0
 2	NULL	0
 -33,9 +33,9  NULL	NULL	3

 -- !query 2
-SELECT a, b, SUM(b) FROM testData GROUP BY a, b WITH CUBE
+SELECT udf(a), udf(b), SUM(b) FROM testData GROUP BY udf(a), b WITH CUBE
 -- !query 2 schema
-struct<a:int,b:int,sum(b):bigint>
+struct<CAST(udf(cast(a as string)) AS INT):int,CAST(udf(cast(b as string)) AS INT):int,sum(b):bigint>
 -- !query 2 output
 1	1	1
 1	2	2
 -52,9 +52,9  NULL	NULL	9

 -- !query 3
-SELECT a + b, b, SUM(a - b) FROM testData GROUP BY a + b, b WITH ROLLUP
+SELECT udf(a + b), b, SUM(a - b) FROM testData GROUP BY a + b, b WITH ROLLUP
 -- !query 3 schema
-struct<(a + b):int,b:int,sum((a - b)):bigint>
+struct<CAST(udf(cast((a + b) as string)) AS INT):int,b:int,sum((a - b)):bigint>
 -- !query 3 output
 2	1	0
 2	NULL	0
 -70,9 +70,9  NULL	NULL	3

 -- !query 4
-SELECT a, b, SUM(b) FROM testData GROUP BY a, b WITH ROLLUP
+SELECT udf(a), b, udf(SUM(b)) FROM testData GROUP BY udf(a), b WITH ROLLUP
 -- !query 4 schema
-struct<a:int,b:int,sum(b):bigint>
+struct<CAST(udf(cast(a as string)) AS INT):int,b:int,CAST(udf(cast(sum(cast(b as bigint)) as string)) AS BIGINT):bigint>
 -- !query 4 output
 1	1	1
 1	2	2
 -97,7 +97,7  struct<>

 -- !query 6
-SELECT course, year, SUM(earnings) FROM courseSales GROUP BY ROLLUP(course, year) ORDER BY course, year
+SELECT course, year, SUM(earnings) FROM courseSales GROUP BY ROLLUP(course, year) ORDER BY udf(course), year
 -- !query 6 schema
 struct<course:string,year:int,sum(earnings):bigint>
 -- !query 6 output
 -111,7 +111,7  dotNET	2013	48000

 -- !query 7
-SELECT course, year, SUM(earnings) FROM courseSales GROUP BY CUBE(course, year) ORDER BY course, year
+SELECT course, year, SUM(earnings) FROM courseSales GROUP BY CUBE(course, year) ORDER BY course, udf(year)
 -- !query 7 schema
 struct<course:string,year:int,sum(earnings):bigint>
 -- !query 7 output
 -127,9 +127,9  dotNET	2013	48000

 -- !query 8
-SELECT course, year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course, year)
+SELECT course, udf(year), SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course, year)
 -- !query 8 schema
-struct<course:string,year:int,sum(earnings):bigint>
+struct<course:string,CAST(udf(cast(year as string)) AS INT):int,sum(earnings):bigint>
 -- !query 8 output
 Java	NULL	50000
 NULL	2012	35000
 -138,26 +138,26  dotNET	NULL	63000

 -- !query 9
-SELECT course, year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course)
+SELECT course, year, udf(SUM(earnings)) FROM courseSales GROUP BY course, year GROUPING SETS(course)
 -- !query 9 schema
-struct<course:string,year:int,sum(earnings):bigint>
+struct<course:string,year:int,CAST(udf(cast(sum(cast(earnings as bigint)) as string)) AS BIGINT):bigint>
 -- !query 9 output
 Java	NULL	50000
 dotNET	NULL	63000

 -- !query 10
-SELECT course, year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(year)
+SELECT udf(course), year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(year)
 -- !query 10 schema
-struct<course:string,year:int,sum(earnings):bigint>
+struct<CAST(udf(cast(course as string)) AS STRING):string,year:int,sum(earnings):bigint>
 -- !query 10 output
 NULL	2012	35000
 NULL	2013	78000

 -- !query 11
-SELECT course, SUM(earnings) AS sum FROM courseSales
-GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, sum
+SELECT course, udf(SUM(earnings)) AS sum FROM courseSales
+GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, udf(sum)
 -- !query 11 schema
 struct<course:string,sum:bigint>
 -- !query 11 output
 -173,7 +173,7  dotNET	63000

 -- !query 12
 SELECT course, SUM(earnings) AS sum, GROUPING_ID(course, earnings) FROM courseSales
-GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, sum
+GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY udf(course), sum
 -- !query 12 schema
 struct<course:string,sum:bigint,grouping_id(course, earnings):int>
 -- !query 12 output
 -188,10 +188,10  dotNET	63000	1

 -- !query 13
-SELECT course, year, GROUPING(course), GROUPING(year), GROUPING_ID(course, year) FROM courseSales
+SELECT udf(course), udf(year), GROUPING(course), GROUPING(year), GROUPING_ID(course, year) FROM courseSales
 GROUP BY CUBE(course, year)
 -- !query 13 schema
-struct<course:string,year:int,grouping(course):tinyint,grouping(year):tinyint,grouping_id(course, year):int>
+struct<CAST(udf(cast(course as string)) AS STRING):string,CAST(udf(cast(year as string)) AS INT):int,grouping(course):tinyint,grouping(year):tinyint,grouping_id(course, year):int>
 -- !query 13 output
 Java	2012	0	0	0
 Java	2013	0	0	0
 -205,7 +205,7  dotNET	NULL	0	1	1

 -- !query 14
-SELECT course, year, GROUPING(course) FROM courseSales GROUP BY course, year
+SELECT course, udf(year), GROUPING(course) FROM courseSales GROUP BY course, udf(year)
 -- !query 14 schema
 struct<>
 -- !query 14 output
 -214,7 +214,7  grouping() can only be used with GroupingSets/Cube/Rollup;

 -- !query 15
-SELECT course, year, GROUPING_ID(course, year) FROM courseSales GROUP BY course, year
+SELECT course, udf(year), GROUPING_ID(course, year) FROM courseSales GROUP BY udf(course), year
 -- !query 15 schema
 struct<>
 -- !query 15 output
 -223,7 +223,7  grouping_id() can only be used with GroupingSets/Cube/Rollup;

 -- !query 16
-SELECT course, year, grouping__id FROM courseSales GROUP BY CUBE(course, year) ORDER BY grouping__id, course, year
+SELECT course, year, grouping__id FROM courseSales GROUP BY CUBE(course, year) ORDER BY grouping__id, course, udf(year)
 -- !query 16 schema
 struct<course:string,year:int,grouping__id:int>
 -- !query 16 output
 -240,7 +240,7  NULL	NULL	3

 -- !query 17
 SELECT course, year FROM courseSales GROUP BY CUBE(course, year)
-HAVING GROUPING(year) = 1 AND GROUPING_ID(course, year) > 0 ORDER BY course, year
+HAVING GROUPING(year) = 1 AND GROUPING_ID(course, year) > 0 ORDER BY course, udf(year)
 -- !query 17 schema
 struct<course:string,year:int>
 -- !query 17 output
 -250,7 +250,7  dotNET	NULL

 -- !query 18
-SELECT course, year FROM courseSales GROUP BY course, year HAVING GROUPING(course) > 0
+SELECT course, udf(year) FROM courseSales GROUP BY udf(course), year HAVING GROUPING(course) > 0
 -- !query 18 schema
 struct<>
 -- !query 18 output
 -259,7 +259,7  grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup;

 -- !query 19
-SELECT course, year FROM courseSales GROUP BY course, year HAVING GROUPING_ID(course) > 0
+SELECT course, udf(udf(year)) FROM courseSales GROUP BY course, year HAVING GROUPING_ID(course) > 0
 -- !query 19 schema
 struct<>
 -- !query 19 output
 -268,9 +268,9  grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup;

 -- !query 20
-SELECT course, year FROM courseSales GROUP BY CUBE(course, year) HAVING grouping__id > 0
+SELECT udf(course), year FROM courseSales GROUP BY CUBE(course, year) HAVING grouping__id > 0
 -- !query 20 schema
-struct<course:string,year:int>
+struct<CAST(udf(cast(course as string)) AS STRING):string,year:int>
 -- !query 20 output
 Java	NULL
 NULL	2012
 -281,7 +281,7  dotNET	NULL

 -- !query 21
 SELECT course, year, GROUPING(course), GROUPING(year) FROM courseSales GROUP BY CUBE(course, year)
-ORDER BY GROUPING(course), GROUPING(year), course, year
+ORDER BY GROUPING(course), GROUPING(year), course, udf(year)
 -- !query 21 schema
 struct<course:string,year:int,grouping(course):tinyint,grouping(year):tinyint>
 -- !query 21 output
 -298,7 +298,7  NULL	NULL	1	1

 -- !query 22
 SELECT course, year, GROUPING_ID(course, year) FROM courseSales GROUP BY CUBE(course, year)
-ORDER BY GROUPING(course), GROUPING(year), course, year
+ORDER BY GROUPING(course), GROUPING(year), course, udf(year)
 -- !query 22 schema
 struct<course:string,year:int,grouping_id(course, year):int>
 -- !query 22 output
 -314,7 +314,7  NULL	NULL	3

 -- !query 23
-SELECT course, year FROM courseSales GROUP BY course, year ORDER BY GROUPING(course)
+SELECT course, udf(year) FROM courseSales GROUP BY course, udf(year) ORDER BY GROUPING(course)
 -- !query 23 schema
 struct<>
 -- !query 23 output
 -323,7 +323,7  grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup;

 -- !query 24
-SELECT course, year FROM courseSales GROUP BY course, year ORDER BY GROUPING_ID(course)
+SELECT course, udf(year) FROM courseSales GROUP BY course, udf(year) ORDER BY GROUPING_ID(course)
 -- !query 24 schema
 struct<>
 -- !query 24 output
 -332,7 +332,7  grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup;

 -- !query 25
-SELECT course, year FROM courseSales GROUP BY CUBE(course, year) ORDER BY grouping__id, course, year
+SELECT course, year FROM courseSales GROUP BY CUBE(course, year) ORDER BY grouping__id, udf(course), year
 -- !query 25 schema
 struct<course:string,year:int>
 -- !query 25 output
 -348,7 +348,7  NULL	NULL

 -- !query 26
-SELECT a + b AS k1, b AS k2, SUM(a - b) FROM testData GROUP BY CUBE(k1, k2)
+SELECT udf(a + b) AS k1, udf(b) AS k2, SUM(a - b) FROM testData GROUP BY CUBE(k1, k2)
 -- !query 26 schema
 struct<k1:int,k2:int,sum((a - b)):bigint>
 -- !query 26 output
 -368,7 +368,7  NULL	NULL	3

 -- !query 27
-SELECT a + b AS k, b, SUM(a - b) FROM testData GROUP BY ROLLUP(k, b)
+SELECT udf(udf(a + b)) AS k, b, SUM(a - b) FROM testData GROUP BY ROLLUP(k, b)
 -- !query 27 schema
 struct<k:int,b:int,sum((a - b)):bigint>
 -- !query 27 output
 -386,9 +386,9  NULL	NULL	3

 -- !query 28
-SELECT a + b, b AS k, SUM(a - b) FROM testData GROUP BY a + b, k GROUPING SETS(k)
+SELECT udf(a + b), udf(udf(b)) AS k, SUM(a - b) FROM testData GROUP BY a + b, k GROUPING SETS(k)
 -- !query 28 schema
-struct<(a + b):int,k:int,sum((a - b)):bigint>
+struct<CAST(udf(cast((a + b) as string)) AS INT):int,k:int,sum((a - b)):bigint>
 -- !query 28 output
 NULL	1	3
 NULL	2	0

```

</p>
</details>

## How was this patch tested?
Tested as instructed in SPARK-27921.

Closes #25362 from skonto/group-analytics-followup.

Authored-by: Stavros Kontopoulos <st.kontopoulos@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-06 15:00:28 +09:00
Jungtaek Lim (HeartSaVioR) 128ea37bda [SPARK-28601][CORE][SQL] Use StandardCharsets.UTF_8 instead of "UTF-8" string representation, and get rid of UnsupportedEncodingException
## What changes were proposed in this pull request?

This patch tries to keep consistency whenever UTF-8 charset is needed, as using `StandardCharsets.UTF_8` instead of using "UTF-8". If the String type is needed, `StandardCharsets.UTF_8.name()` is used.

This change also brings the benefit of getting rid of `UnsupportedEncodingException`, as we're providing `Charset` instead of `String` whenever possible.

This also changes some private Catalyst helper methods to operate on encodings as `Charset` objects rather than strings.

## How was this patch tested?

Existing unit tests.

Closes #25335 from HeartSaVioR/SPARK-28601.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-05 20:45:54 -07:00
Wenchen Fan 03e3006312 [SPARK-28213][SQL][FOLLOWUP] code cleanup and bug fix for columnar execution framework
## What changes were proposed in this pull request?

I did a post-hoc review of https://github.com/apache/spark/pull/25008 , and would like to propose some cleanups/fixes/improvements:

1. Do not track the scanTime metrics in `ColumnarToRowExec`. This metrics is specific to file scan, and doesn't make sense for a general batch-to-row operator.
2. Because of 2, we need to track scanTime when building RDDs in the file scan node.
3. use `RDD#mapPartitionsInternal` instead of `flatMap` in several places, as `mapPartitionsInternal` is created for Spark SQL and we use it in almost all the SQL operators.
4. Add `limitNotReachedCond` in `ColumnarToRowExec`. This was in the `ColumnarBatchScan` before and is critical for performance.
5. Clear the relationship between codegen stage and columnar stage. The whole-stage-codegen framework is completely row-based, so these 2 kinds of stages can NEVER overlap. When they are adjacent, it's either a `RowToColumnarExec` above `WholeStageExec`, or a `ColumnarToRowExec` above the `InputAdapter`.
6. Reuse the `ColumnarBatch` in `RowToColumnarExec`. We don't need to create a new one every time, just need to reset it.
7. Do not skip testing full scan node in `LogicalPlanTagInSparkPlanSuite`
8. Add back the removed tests in `WholeStageCodegenSuite`.

## How was this patch tested?

existing tests

Closes #25264 from cloud-fan/minor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-06 10:11:18 +08:00
Wenchen Fan 6fb79af48c [SPARK-28344][SQL] detect ambiguous self-join and fail the query
## What changes were proposed in this pull request?

This is an alternative solution of https://github.com/apache/spark/pull/24442 . It fails the query if ambiguous self join is detected, instead of trying to disambiguate it. The problem is that, it's hard to come up with a reasonable rule to disambiguate, the rule proposed by #24442 is mostly a heuristic.

### background of the self-join problem:
This is a long-standing bug and I've seen many people complaining about it in JIRA/dev list.

A typical example:
```
val df1 = …
val df2 = df1.filter(...)
df1.join(df2, df1("a") > df2("a")) // returns empty result
```
The root cause is, `Dataset.apply` is so powerful that users think it returns a column reference which can point to the column of the Dataset at anywhere. This is not true in many cases. `Dataset.apply` returns an `AttributeReference` . Different Datasets may share the same `AttributeReference`. In the example above, `df2` adds a Filter operator above the logical plan of `df1`, and the Filter operator reserves the output `AttributeReference` of its child. This means, `df1("a")` is exactly the same as `df2("a")`, and `df1("a") > df2("a")` always evaluates to false.

### The rule to detect ambiguous column reference caused by self join:
We can reuse the infra in #24442 :
1. each Dataset has a globally unique id.
2. the `AttributeReference` returned by `Dataset.apply` carries the ID and column position(e.g. 3rd column of the Dataset) via metadata.
3. the logical plan of a `Dataset` carries the ID via `TreeNodeTag`

When self-join happens, the analyzer asks the right side plan of join to re-generate output attributes with new exprIds. Based on it, a simple rule to detect ambiguous self join is:
1. find all column references (i.e. `AttributeReference`s with Dataset ID and col position) in the root node of a query plan.
2. for each column reference, traverse the query plan tree, find a sub-plan that carries Dataset ID and the ID is the same as the one in the column reference.
3. get the corresponding output attribute of the sub-plan by the col position in the column reference.
4. if the corresponding output attribute has a different exprID than the column reference, then it means this sub-plan is on the right side of a self-join and has regenerated its output attributes. This is an ambiguous self join because the column reference points to a table being self-joined.

## How was this patch tested?

existing tests and new test cases

Closes #25107 from cloud-fan/new-self-join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-06 10:06:36 +08:00
Kousuke Saruta 794804ea5e [SPARK-28537][SQL] DebugExec cannot debug broadcast or columnar related queries
DebugExec does not implement doExecuteBroadcast and doExecuteColumnar so we can't debug broadcast or columnar related query.

One example for broadcast is here.
```
val df1 = Seq(1, 2, 3).toDF
val df2 = Seq(1, 2, 3).toDF
val joined = df1.join(df2, df1("value") === df2("value"))
joined.debug()

java.lang.UnsupportedOperationException: Debug does not implement doExecuteBroadcast
...
```

Another for columnar is here.
```
val df = Seq(1, 2, 3).toDF
df.persist
df.debug()

java.lang.IllegalStateException: Internal Error class org.apache.spark.sql.execution.debug.package$DebugExec has column support mismatch:
...
```

## How was this patch tested?

Additional test cases in DebuggingSuite.

Closes #25274 from sarutak/fix-debugexec.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2019-08-06 08:26:51 +09:00
wuyi 94499af6f0 [SPARK-28486][CORE][PYTHON] Map PythonBroadcast's data file to a BroadcastBlock to avoid delete by GC
## What changes were proposed in this pull request?

Currently, PythonBroadcast may delete its data file while a python worker still needs it. This happens because PythonBroadcast overrides the `finalize()` method to delete its data file. So, when GC happens and no  references on broadcast variable, it may trigger `finalize()` to delete
data file. That's also means, data under python Broadcast variable couldn't be deleted when `unpersist()`/`destroy()` called but relys on GC.

In this PR, we removed the `finalize()` method, and map the PythonBroadcast data file to a BroadcastBlock(which has the same broadcast id with the broadcast variable who wrapped this PythonBroadcast) when PythonBroadcast is deserializing. As a result, the data file could be deleted just like other pieces of the Broadcast variable when `unpersist()`/`destroy()` called and do not rely on GC any more.

## How was this patch tested?

Added a Python test, and tested manually(verified create/delete the broadcast block).

Closes #25262 from Ngone51/SPARK-28486.

Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-05 20:18:53 +09:00
John Zhuge cae500a255 [SPARK-28178][SQL][FOLLOWUP] DataSourceV2: DataFrameWriter.insertInfo
## What changes were proposed in this pull request?

- DataFrameWriter.insertInto should match column names by position.
- Clean up test cases.

## How was this patch tested?

New tests:
- insertInto: append by position
- insertInto: overwrite partitioned table in static mode by position
- insertInto: overwrite partitioned table in dynamic mode by position

Closes #25353 from jzhuge/SPARK-28178-bypos.

Authored-by: John Zhuge <jzhuge@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-05 16:05:23 +08:00
Yuanjian Li db39f45baf [SPARK-28593][CORE] Rename ShuffleClient to BlockStoreClient which more close to its usage
## What changes were proposed in this pull request?

After SPARK-27677, the shuffle client not only handles the shuffle block but also responsible for local persist RDD blocks. For better code scalability and precise semantics(as the [discussion](https://github.com/apache/spark/pull/24892#discussion_r300173331)), here we did several changes:

- Rename ShuffleClient to BlockStoreClient.
- Correspondingly rename the ExternalShuffleClient to ExternalBlockStoreClient, also change the server-side class from ExternalShuffleBlockHandler to ExternalBlockHandler.
- Move MesosExternalBlockStoreClient to Mesos package.

Note, we still keep the name of BlockTransferService, because the `Service` contains both client and server, also the name of BlockTransferService is not referencing shuffle client only.

## How was this patch tested?

Existing UT.

Closes #25327 from xuanyuanking/SPARK-28593.

Lead-authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Co-authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-05 14:54:45 +08:00
Ryan Blue 0345f1174d [SPARK-27661][SQL] Add SupportsNamespaces API
## What changes were proposed in this pull request?

This adds an interface for catalog plugins that exposes namespace operations:
* `listNamespaces`
* `namespaceExists`
* `loadNamespaceMetadata`
* `createNamespace`
* `alterNamespace`
* `dropNamespace`

## How was this patch tested?

API only. Existing tests for regressions.

Closes #24560 from rdblue/SPARK-27661-add-catalog-namespace-api.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Burak Yavuz <brkyvz@gmail.com>
2019-08-04 21:29:40 -07:00
Dongjoon Hyun ae08387b4c [SPARK-28616][INFRA] Improve merge-spark-pr script to warn WIP PRs and strip trailing dots
## What changes were proposed in this pull request?

This PR aims to improve the `merge-spark-pr` script in the following two ways.
1. `[WIP]` is useful when we show that a PR is not ready for merge. Apache Spark allows merging `WIP` PRs. However, sometime, we accidentally forgot to clean up the title for the completed PRs. We had better warn once more during merging stage and get a confirmation from the committers.
2. We have two kinds of PR titles in terms of the ending period. This PR aims to remove the trailing `dot` since the shorter is the better in the commit title. Also, the PR titles without the trailing `dot` is dominant in the Apache Spark commit logs.
```
$ git log --oneline | grep '[.]$' | wc -l
    4090
$ git log --oneline | grep '[^.]$' | wc -l
   20747
```

## How was this patch tested?

Manual.
```
$ dev/merge_spark_pr.py
git rev-parse --abbrev-ref HEAD
Which pull request would you like to merge? (e.g. 34): 25157

The PR title has `[WIP]`:
[WIP][SPARK-28396][SQL] Add PathCatalog for data source V2
Continue? (y/n):
```

```
$ dev/merge_spark_pr.py
git rev-parse --abbrev-ref HEAD
Which pull request would you like to merge? (e.g. 34): 25304
I've re-written the title as follows to match the standard format:
Original: [SPARK-28570][CORE][SHUFFLE] Make UnsafeShuffleWriter use the new API.
Modified: [SPARK-28570][CORE][SHUFFLE] Make UnsafeShuffleWriter use the new API
Would you like to use the modified title? (y/n):
```

Closes #25356 from dongjoon-hyun/SPARK-28616.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-04 21:23:54 -07:00
Yuming Wang 21a18c6490 [SPARK-28614][SQL][TEST] Do not remove leading write space in the golden result file
## What changes were proposed in this pull request?

It's hard to know if the query needs to be sorted like [`SQLQueryTestSuite.isSorted`](2ecc39c8d3/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala (L375-L380)) when building a test framework for Thriftserver. So we  can sort both  the `outputs` and  the `expectedOutputs. However, we removed leading write space in the golden result file. This can lead to inconsistent results.
This PR makes it does not remove leading write space in the golden result file. Trailing write space still needs to be removed.

## How was this patch tested?

N/A

Closes #25351 from wangyum/SPARK-28614.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-04 17:43:31 -07:00
Sean Owen c09675779b [SPARK-28604][ML] Use log1p(x) over log(1+x) and expm1(x) over exp(x)-1 for accuracy
## What changes were proposed in this pull request?

Use `log1p(x)` over `log(1+x)` and `expm1(x)` over `exp(x)-1` for accuracy, where possible. This should improve accuracy a tiny bit in ML-related calculations, and shouldn't hurt in any event.

## How was this patch tested?

Existing tests.

Closes #25337 from srowen/SPARK-28604.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-04 17:04:01 -05:00
Dongjoon Hyun 4856c0e33a [SPARK-28609][DOC] Fix broken styles/links and make up-to-date
## What changes were proposed in this pull request?

This PR aims to fix the broken styles/links and make the doc up-to-date for Apache Spark 2.4.4 and 3.0.0 release.

- `building-spark.md`
![Screen Shot 2019-08-02 at 10 33 51 PM](https://user-images.githubusercontent.com/9700541/62407962-a248ec80-b575-11e9-8a16-532e9bc421f8.png)

- `configuration.md`
![Screen Shot 2019-08-02 at 10 34 52 PM](https://user-images.githubusercontent.com/9700541/62407969-c7d5f600-b575-11e9-9b1a-a76c6cc095c5.png)

- `sql-pyspark-pandas-with-arrow.md`
![Screen Shot 2019-08-02 at 10 36 14 PM](https://user-images.githubusercontent.com/9700541/62407979-18e5ea00-b576-11e9-99af-7ad9264656ae.png)

- `streaming-programming-guide.md`
![Screen Shot 2019-08-02 at 10 37 11 PM](https://user-images.githubusercontent.com/9700541/62407981-213e2500-b576-11e9-8bc5-a925df7e98a7.png)

- `structured-streaming-programming-guide.md` (1/2)
![Screen Shot 2019-08-02 at 10 38 20 PM](https://user-images.githubusercontent.com/9700541/62408001-49c61f00-b576-11e9-9519-f699775ceecd.png)

- `structured-streaming-programming-guide.md` (2/2)
![Screen Shot 2019-08-02 at 10 40 05 PM](https://user-images.githubusercontent.com/9700541/62408017-7f6b0800-b576-11e9-9341-52664ba6b460.png)

- `submitting-applications.md`
![Screen Shot 2019-08-02 at 10 41 13 PM](https://user-images.githubusercontent.com/9700541/62408027-b2ad9700-b576-11e9-910e-8f22173e1251.png)

## How was this patch tested?

Manual. Build the doc.
```
SKIP_API=1 jekyll build
```

Closes #25345 from dongjoon-hyun/SPARK-28609.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-04 09:42:47 -07:00
WeichenXu b3394db193 [SPARK-28582][PYTHON] Fix flaky test DaemonTests.do_termination_test which fail on Python 3.7
## What changes were proposed in this pull request?

This PR picks up https://github.com/apache/spark/pull/25315 back after removing `Popen.wait` usage which exists in Python 3 only. I saw the last test results wrongly and thought it was passed.

Fix flaky test DaemonTests.do_termination_test which fail on Python 3.7. I add a sleep after the test connection to daemon.

## How was this patch tested?

Run test
```
python/run-tests --python-executables=python3.7 --testname "pyspark.tests.test_daemon DaemonTests"
```
**Before**
Fail on test "test_termination_sigterm". And we can see daemon process do not exit.
**After**
Test passed

Closes #25343 from HyukjinKwon/SPARK-28582.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-03 10:31:15 +09:00
Dongjoon Hyun 0c6874fb37 [SPARK-28606][INFRA] Update CRAN key to recover docker image generation
## What changes were proposed in this pull request?

CRAN repo changed the key and it causes our release script failure. This is a release blocker for Apache Spark 2.4.4 and 3.0.0.
- https://cran.r-project.org/bin/linux/ubuntu/README.html
```
Err:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease
  The following signatures couldn't be verified because the public key is not available: NO_PUBKEY 51716619E084DAB9
...
W: GPG error: https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease: The following signatures couldn't be verified because the public key is not available: NO_PUBKEY 51716619E084DAB9
E: The repository 'https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease' is not signed.
```

Note that they are reusing `cran35` for R 3.6 although they changed the key.
```
Even though R has moved to version 3.6, for compatibility the sources.list entry still uses the cran3.5 designation.
```

This PR aims to recover the docker image generation first. We will verify the R doc generation in a separate JIRA and PR.

## How was this patch tested?

Manual. After `docker-build.log`, it should continue to the next stage, `Building v3.0.0-rc1`.
```
$ dev/create-release/do-release-docker.sh -d /tmp/spark-3.0.0 -n -s docs
...
Log file: docker-build.log
Building v3.0.0-rc1; output will be at /tmp/spark-3.0.0/output
```

Closes #25339 from dongjoon-hyun/SPARK-28606.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-08-02 23:41:00 +00:00
yunzoud c212c9d9ed
[SPARK-28574][CORE] Allow to config different sizes for event queues
## What changes were proposed in this pull request?
Add configuration spark.scheduler.listenerbus.eventqueue.${name}.capacity to allow configuration of different event queue size.

## How was this patch tested?
Unit test in core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Closes #25307 from yunzoud/SPARK-28574.

Authored-by: yunzoud <yun.zou@databricks.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2019-08-02 15:27:33 -07:00
Xiao Li 10d4ffd577 [SPARK-28532][SPARK-28530][SQL][FOLLOWUP] Inline doc for FixedPoint(1) batches "Subquery" and "Join Reorder"
## What changes were proposed in this pull request?
Explained why "Subquery" and "Join Reorder" optimization batches should be `FixedPoint(1)`, which was introduced in SPARK-28532 and SPARK-28530.

## How was this patch tested?

Existing UTs.

Closes #25320 from yeshengm/SPARK-28530-followup.

Lead-authored-by: Xiao Li <gatorsmile@gmail.com>
Co-authored-by: Yesheng Ma <kimi.ysma@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-08-02 14:23:41 -07:00
Dongjoon Hyun 8ae032d78d Revert "[SPARK-28582][PYSPARK] Fix flaky test DaemonTests.do_termination_test which fail on Python 3.7"
This reverts commit fbeee0c5bc.
2019-08-02 10:14:20 -07:00
Jungtaek Lim (HeartSaVioR) 7ffc00ccc3 [MINOR][DOC][SS] Correct description of minPartitions in Kafka option
## What changes were proposed in this pull request?

`minPartitions` has been used as a hint and relevant method (KafkaOffsetRangeCalculator.getRanges) doesn't guarantee the behavior that partitions will be equal or more than given value.

d67b98ea01/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala (L32-L46)

This patch makes clear the configuration is a hint, and actual partitions could be less or more.

## How was this patch tested?

Just a documentation change.

Closes #25332 from HeartSaVioR/MINOR-correct-kafka-structured-streaming-doc-minpartition.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-02 09:12:54 -07:00
Sean Owen b148bd5ccb [SPARK-28519][SQL] Use StrictMath log, pow functions for platform independence
## What changes were proposed in this pull request?

See discussion on the JIRA (and dev). At heart, we find that math.log and math.pow can actually return slightly different results across platforms because of hardware optimizations. For the actual SQL log and pow functions, I propose that we should use StrictMath instead to ensure the answers are already the same. (This should have the benefit of helping tests pass on aarch64.)

Further, the atanh function (which is not part of java.lang.Math) can be implemented in a slightly different and more accurate way.

## How was this patch tested?

Existing tests (which will need to be changed).
Some manual testing locally to understand the numeric issues.

Closes #25279 from srowen/SPARK-28519.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-02 10:55:44 -05:00
actuaryzhang 6d7a6751d8 [SPARK-20604][ML] Allow imputer to handle numeric types
## What changes were proposed in this pull request?

Imputer currently requires input column to be Double or Float, but the logic should work on any numeric data types. Many practical problems have integer  data types, and it could get very tedious to manually cast them into Double before calling imputer. This transformer could be extended to handle all numeric types.

## How was this patch tested?
new test

Closes #17864 from actuaryzhang/imputer.

Lead-authored-by: actuaryzhang <actuaryzhang10@gmail.com>
Co-authored-by: Wayne Zhang <actuaryzhang@uber.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-02 10:54:50 -05:00
Huaxin Gao 660423d717 [SPARK-23469][ML] HashingTF should use corrected MurmurHash3 implementation
## What changes were proposed in this pull request?

Update HashingTF to use new implementation of MurmurHash3
Make HashingTF use the old MurmurHash3 when a model from pre 3.0 is loaded

## How was this patch tested?

Change existing unit tests. Also add one unit test to make sure HashingTF use the old MurmurHash3 when a model from pre 3.0 is loaded

Closes #25303 from huaxingao/spark-23469.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-02 10:53:36 -05:00
Yuming Wang efd92993f4 [SPARK-28510][SQL] Implement Spark's own GetFunctionsOperation
## What changes were proposed in this pull request?

This PR implements Spark's own GetFunctionsOperation which mitigates the differences between Spark SQL and Hive UDFs. But our implementation is different from Hive's implementation:
- Our implementation always returns results. Hive only returns results when [(null == catalogName || "".equals(catalogName)) && (null == schemaName || "".equals(schemaName))](https://github.com/apache/hive/blob/rel/release-3.1.1/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java#L101-L119).
- Our implementation pads the `REMARKS` field with the function usage - Hive returns an empty string.
- Our implementation does not support `FUNCTION_TYPE`, but Hive does.

## How was this patch tested?

unit tests

Closes #25252 from wangyum/SPARK-28510.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-08-02 08:50:42 -07:00
zhengruifeng 37243e160e [SPARK-28579][ML] MaxAbsScaler avoids conversion to breeze.vector
## What changes were proposed in this pull request?
avoid `.ml.vector => .breeze.vector` conversion in `MaxAbsScaler`,
and reuse the transformation method in `StandardScalerModel`, which can deal with dense & sparse vector separately.

## How was this patch tested?
existing suites

Closes #25311 from zhengruifeng/maxabs_opt.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-02 10:38:10 -05:00
WeichenXu fbeee0c5bc [SPARK-28582][PYSPARK] Fix flaky test DaemonTests.do_termination_test which fail on Python 3.7
## What changes were proposed in this pull request?

Fix flaky test DaemonTests.do_termination_test which fail on Python 3.7. I add a sleep after the test connection to daemon.

## How was this patch tested?

Run test
```
python/run-tests --python-executables=python3.7 --testname "pyspark.tests.test_daemon DaemonTests"
```
**Before**
Fail on test "test_termination_sigterm". And we can see daemon process do not exit.
**After**
Test passed

Closes #25315 from WeichenXu123/fix_py37_daemon.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-02 22:07:06 +09:00
Liang-Chi Hsieh 77c7e91e02 [SPARK-28445][SQL][PYTHON] Fix error when PythonUDF is used in both group by and aggregate expression
## What changes were proposed in this pull request?

When PythonUDF is used in group by, and it is also in aggregate expression, like

```
SELECT pyUDF(a + 1), COUNT(b) FROM testData GROUP BY pyUDF(a + 1)
```

It causes analysis exception in `CheckAnalysis`, like
```
org.apache.spark.sql.AnalysisException: expression 'testdata.`a`' is neither present in the group by, nor is it an aggregate function.
```

First, `CheckAnalysis` can't check semantic equality between PythonUDFs.
Second, even we make it possible, runtime exception will be thrown

```
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF1#8615
...
Cause: java.lang.RuntimeException: Couldn't find pythonUDF1#8615 in [cast(pythonUDF0#8614 as int)#8617,count(b#8599)#8607L]
```

The cause is, `ExtractPythonUDFs` extracts both PythonUDFs in group by and aggregate expression. The PythonUDFs are two different aliases now in the logical aggregate. In runtime, we can't bind the resulting expression in aggregate to its grouping and aggregate attributes.

This patch proposes a rule `ExtractGroupingPythonUDFFromAggregate` to extract PythonUDFs in group by and evaluate them before aggregate. We replace the group by PythonUDF in aggregate expression with aliased result.

The query plan of query `SELECT pyUDF(a + 1), pyUDF(COUNT(b)) FROM testData GROUP BY pyUDF(a + 1)`, like

```
== Optimized Logical Plan ==
Project [CAST(pyUDF(cast((a + 1) as string)) AS INT)#8608, cast(pythonUDF0#8616 as bigint) AS CAST(pyUDF(cast(count(b) as string)) AS BIGINT)#8610L]
+- BatchEvalPython [pyUDF(cast(agg#8613L as string))], [pythonUDF0#8616]
   +- Aggregate [cast(groupingPythonUDF#8614 as int)], [cast(groupingPythonUDF#8614 as int) AS CAST(pyUDF(cast((a + 1) as string)) AS INT)#8608, count(b#8599) AS agg#8613L]
      +- Project [pythonUDF0#8615 AS groupingPythonUDF#8614, b#8599]
         +- BatchEvalPython [pyUDF(cast((a#8598 + 1) as string))], [pythonUDF0#8615]
            +- LocalRelation [a#8598, b#8599]

== Physical Plan ==
*(3) Project [CAST(pyUDF(cast((a + 1) as string)) AS INT)#8608, cast(pythonUDF0#8616 as bigint) AS CAST(pyUDF(cast(count(b) as string)) AS BIGINT)#8610L]
+- BatchEvalPython [pyUDF(cast(agg#8613L as string))], [pythonUDF0#8616]
   +- *(2) HashAggregate(keys=[cast(groupingPythonUDF#8614 as int)#8617], functions=[count(b#8599)], output=[CAST(pyUDF(cast((a + 1) as string)) AS INT)#8608, agg#8613L])
      +- Exchange hashpartitioning(cast(groupingPythonUDF#8614 as int)#8617, 5), true
         +- *(1) HashAggregate(keys=[cast(groupingPythonUDF#8614 as int) AS cast(groupingPythonUDF#8614 as int)#8617], functions=[partial_count(b#8599)], output=[cast(groupingPythonUDF#8614 as int)#8617, count#8619L])
            +- *(1) Project [pythonUDF0#8615 AS groupingPythonUDF#8614, b#8599]
               +- BatchEvalPython [pyUDF(cast((a#8598 + 1) as string))], [pythonUDF0#8615]
                  +- LocalTableScan [a#8598, b#8599]
```

## How was this patch tested?

Added tests.

Closes #25215 from viirya/SPARK-28445.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-02 19:47:29 +09:00
Nick Karpov 6d32deeecc [SPARK-28475][CORE] Add regex MetricFilter to GraphiteSink
## What changes were proposed in this pull request?

Today all registered metric sources are reported to GraphiteSink with no filtering mechanism, although the codahale project does support it.

GraphiteReporter (ScheduledReporter) from the codahale project requires you implement and supply the MetricFilter interface (there is only a single implementation by default in the codahale project, MetricFilter.ALL).

Propose to add an additional regex config to match and filter metrics to the GraphiteSink

## How was this patch tested?

Included a GraphiteSinkSuite that tests:

1. Absence of regex filter (existing default behavior maintained)
2. Presence of `regex=<regexexpr>` correctly filters metric keys

Closes #25232 from nkarpov/graphite_regex.

Authored-by: Nick Karpov <nick@nickkarpov.com>
Signed-off-by: jerryshao <jerryshao@tencent.com>
2019-08-02 17:50:15 +08:00
Yuming Wang 4e7a4cd20e [SPARK-28521][SQL] Fix error message for built-in functions
## What changes were proposed in this pull request?

```sql
spark-sql> select cast(1);
19/07/26 00:54:17 ERROR SparkSQLDriver: Failed in [select cast(1)]
java.lang.UnsupportedOperationException: empty.init
	at scala.collection.TraversableLike$class.init(TraversableLike.scala:451)
	at scala.collection.mutable.ArrayOps$ofInt.scala$collection$IndexedSeqOptimized$$super$init(ArrayOps.scala:234)
	at scala.collection.IndexedSeqOptimized$class.init(IndexedSeqOptimized.scala:135)
	at scala.collection.mutable.ArrayOps$ofInt.init(ArrayOps.scala:234)
	at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$7$$anonfun$11.apply(FunctionRegistry.scala:565)
	at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$7$$anonfun$11.apply(FunctionRegistry.scala:558)
	at scala.Option.getOrElse(Option.scala:121)
```

The reason is that we did not handle the case [`validParametersCount.length == 0`](2d74f14d74/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala (L588)) because the [parameter types](2d74f14d74/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala (L589)) can be `Expression`, `DataType` and `Option`. This PR makes it  handle the case `validParametersCount.length == 0`.

## How was this patch tested?

unit tests

Closes #25261 from wangyum/SPARK-28521.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-01 18:02:50 -05:00
Marcelo Vanzin 607fb87906 [SPARK-28584][CORE] Fix thread safety issue in blacklist timer, tests
There's a small, probably very hard to hit thread-safety issue in the blacklist
abort timers in the task scheduler, where they access a non-thread-safe map without
locks.

In the tests, the code was also calling methods on the TaskSetManager without
holding the proper locks, which could cause threads to call non-thread-safe
TSM methods concurrently.

Closes #25317 from vanzin/SPARK-28584.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-01 10:37:47 -07:00
HyukjinKwon 8e1602a04f [SPARK-28568][SHUFFLE][DOCS] Make Javadoc in org.apache.spark.shuffle.api visible
## What changes were proposed in this pull request?

This PR proposes to make Javadoc in org.apache.spark.shuffle.api visible.

## How was this patch tested?

Manually built the doc and checked:

![Screen Shot 2019-08-01 at 4 48 23 PM](https://user-images.githubusercontent.com/6477701/62275587-400cc080-b47d-11e9-8fba-c4a0607093d1.png)

Closes #25323 from HyukjinKwon/SPARK-28568.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-01 10:24:29 -07:00
HyukjinKwon 24c1bc2483 [SPARK-28586][INFRA] Make merge-spark-pr script compatible with Python 3
## What changes were proposed in this pull request?

This PR proposes to make `merge_spark_pr.py` script Python 3 compatible.

## How was this patch tested?

Manually tested against my forked remote with the PR and JIRA below:

https://github.com/apache/spark/pull/25321
https://github.com/apache/spark/pull/25286
https://issues.apache.org/jira/browse/SPARK-28153

Closes #25322 from HyukjinKwon/merge-script.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-01 10:17:17 -07:00
zhengruifeng b29829e2ab [SPARK-25584][ML][DOC] datasource for libsvm user guide
## What changes were proposed in this pull request?
it seems that doc for libsvm datasource is not added in https://github.com/apache/spark/pull/22675.
This pr is to add it.

## How was this patch tested?
doc built locally
![图片](https://user-images.githubusercontent.com/7322292/62044350-4ad51480-b235-11e9-8f09-cbcbe9d3b7f9.png)

Closes #25286 from zhengruifeng/doc_libsvm_data_source.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-01 09:15:42 -05:00
Wing Yew Poon 80ab19b9fd [SPARK-26329][CORE] Faster polling of executor memory metrics.
## What changes were proposed in this pull request?

Prior to this change, in an executor, on each heartbeat, memory metrics are polled and sent in the heartbeat. The heartbeat interval is 10s by default. With this change, in an executor, memory metrics can optionally be polled in a separate poller at a shorter interval.

For each executor, we use a map of (stageId, stageAttemptId) to (count of running tasks, executor metric peaks) to track what stages are active as well as the per-stage memory metric peaks. When polling the executor memory metrics, we attribute the memory to the active stage(s), and update the peaks. In a heartbeat, we send the per-stage peaks (for stages active at that time), and then reset the peaks. The semantics would be that the per-stage peaks sent in each heartbeat are the peaks since the last heartbeat.

We also keep a map of taskId to memory metric peaks. This tracks the metric peaks during the lifetime of the task. The polling thread updates this as well. At end of a task, we send the peak metric values in the task result. In case of task failure, we send the peak metric values in the `TaskFailedReason`.

We continue to do the stage-level aggregation in the EventLoggingListener.

For the driver, we still only poll on heartbeats. What the driver sends will be the current values of the metrics in the driver at the time of the heartbeat. This is semantically the same as before.

## How was this patch tested?

Unit tests. Manually tested applications on an actual system and checked the event logs; the metrics appear in the SparkListenerTaskEnd and SparkListenerStageExecutorMetrics events.

Closes #23767 from wypoon/wypoon_SPARK-26329.

Authored-by: Wing Yew Poon <wypoon@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
2019-08-01 09:09:46 -05:00
WeichenXu 26d03b62e2 [SPARK-28366][CORE] Logging in driver when loading single large unsplittable file
## What changes were proposed in this pull request?

Logging in driver when loading single large unsplittable file via `sc.textFile` or csv/json datasouce.
Current condition triggering logging is
* only generate one partition
* file is unsplittable, possible reason is:
   - compressed by unsplittable compression algo such as gzip.
   - multiLine mode in csv/json datasource
   - wholeText mode in text datasource
* file size exceed the config threshold `spark.io.warning.largeFileThreshold` (default value is 1GB)

## How was this patch tested?

Manually test.
Generate one gzip file exceeding 1GB,
```
base64 -b 50 /dev/urandom | head -c 2000000000 > file1.txt
cat file1.txt | gzip > file1.gz
```
then launch spark-shell,

run
```
sc.textFile("file:///path/to/file1.gz").count()
```
Will print log like:
```
WARN HadoopRDD: Loading one large unsplittable file file:/.../f1.gz with only one partition, because the file is compressed by unsplittable compression codec
```

run
```
sc.textFile("file:///path/to/file1.txt").count()
```
Will print log like:
```
WARN HadoopRDD: Loading one large file file:/.../f1.gz with only one partition, we can increase partition numbers by the `minPartitions` argument in method `sc.textFile
```

run
```
spark.read.csv("file:///path/to/file1.gz").count
```
Will print log like:
```
WARN CSVScan: Loading one large unsplittable file file:/.../f1.gz with only one partition, the reason is: the file is compressed by unsplittable compression codec
```

run
```
spark.read.option("multiLine", true).csv("file:///path/to/file1.gz").count
```
Will print log like:
```
WARN CSVScan: Loading one large unsplittable file file:/.../f1.gz with only one partition, the reason is: the csv datasource is set multiLine mode
```

JSON and Text datasource also tested with similar cases.

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #25134 from WeichenXu123/log_gz.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-01 20:29:18 +08:00
Marco Gaido ee41001949 [SPARK-26218][SQL] Overflow on arithmetic operations returns incorrect result
## What changes were proposed in this pull request?

When an overflow occurs performing an arithmetic operation, we are returning an incorrect value. Instead, we should throw an exception, as stated in the SQL standard.

## How was this patch tested?

added UT + existing UTs (improved)

Closes #21599 from mgaido91/SPARK-24598.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-01 14:51:38 +08:00
Marcelo Vanzin b3ffd8be14 [SPARK-24352][CORE][TESTS] De-flake StandaloneDynamicAllocationSuite blacklist test
The issue is that the test tried to stop an existing scheduler and replace it with
a new one set up for the test. That can cause issues because both were sharing the
same RpcEnv underneath, and unregistering RpcEndpoints is actually asynchronous
(see comment in Dispatcher.unregisterRpcEndpoint). So that could lead to races where
the new scheduler tried to register before the old one was fully unregistered.

The updated test avoids the issue by using a separate RpcEnv / scheduler instance
altogether, and also avoids a misleading NPE in the test logs.

Closes #25318 from vanzin/SPARK-24352.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-31 17:44:20 -07:00
sychen 70ef9064a8 [SPARK-28564][CORE] Access history application defaults to the last attempt id
## What changes were proposed in this pull request?
When we set ```spark.history.ui.maxApplications``` to a small value, we can't get some apps from the page search.
If the url is spliced (http://localhost:18080/history/local-xxx), it can be accessed if the app has no attempt.
But in the case of multiple attempted apps, such a url cannot be accessed, and the page displays Not Found.

## How was this patch tested?
Add UT

Closes #25301 from cxzl25/hs_app_last_attempt_id.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-07-31 13:24:36 -07:00
Yuming Wang 3002a3bf3c [SPARK-28581][SQL] Replace _FUNC_ in UDF ExpressionInfo
## What changes were proposed in this pull request?

This PR moves `replaceFunctionName(usage: String, functionName: String)`
from `DescribeFunctionCommand` to `ExpressionInfo` in order to make `ExpressionInfo` returns actual name instead of placeholder. We can get `ExpressionInfo`s directly through `SessionCatalog.lookupFunctionInfo` API and get the real names.

## How was this patch tested?

unit tests

Closes #25314 from wangyum/SPARK-28581.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-31 13:08:49 -07:00
HyukjinKwon b8e13b0aea [SPARK-28153][PYTHON] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF)
## What changes were proposed in this pull request?

This PR proposes to use `AtomicReference` so that parent and child threads can access to the same file block holder.

Python UDF expressions are turned to a plan and then it launches a separate thread to consume the input iterator. In the separate child thread, the iterator sets `InputFileBlockHolder.set` before the parent does which the parent thread is unable to read later.

1. In this separate child thread, if it happens to call `InputFileBlockHolder.set` first without initialization of the parent's thread local (which is done when the `ThreadLocal.get()` is first called), the child thread seems calling its own `initialValue` to initialize.

2. After that, the parent calls its own `initialValue` to initializes at the first call of `ThreadLocal.get()`.

3. Both now have two different references. Updating at child isn't reflected to parent.

This PR fixes it via initializing parent's thread local with `AtomicReference` for file status so that they can be used in each task, and children thread's update is reflected.

I also tried to explain this a bit more at https://github.com/apache/spark/pull/24958#discussion_r297203041.

## How was this patch tested?

Manually tested and unittest was added.

Closes #24958 from HyukjinKwon/SPARK-28153.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-07-31 22:40:01 +08:00
gengjiaan d03ec65f01 [SPARK-27924][SQL] Support ANSI SQL Boolean-Predicate syntax
## What changes were proposed in this pull request?

This PR aims to support ANSI SQL `Boolean-Predicate` syntax.
```sql
expression IS [NOT] TRUE
expression IS [NOT] FALSE
expression IS [NOT] UNKNOWN
```

There are some mainstream database support this syntax.
- **PostgreSQL:**  https://www.postgresql.org/docs/9.1/functions-comparison.html
- **Hive:** https://issues.apache.org/jira/browse/HIVE-13583
- **Redshift:** https://docs.aws.amazon.com/redshift/latest/dg/r_Boolean_type.html
- **Vertica:** https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/LanguageElements/Predicates/Boolean-predicate.htm

For example:
```sql
spark-sql> select null is true, null is not true;
false	true

spark-sql> select false is true, false is not true;
false	true

spark-sql> select true is true, true is not true;
true	false

spark-sql> select null is false, null is not false;
false	true

spark-sql> select false is false, false is not false;
true	false

spark-sql> select true is false,  true is not false;
false	true

spark-sql> select null is unknown, null is not unknown;
true	false

spark-sql> select false is unknown, false is not unknown;
false	true

spark-sql> select true is unknown, true is not unknown;
false	true
```
**Note**: A null input is treated as the logical value "unknown".

## How was this patch tested?

Pass the Jenkins with the newly added test cases.

Closes #25074 from beliefer/ansi-sql-boolean-test.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-30 23:59:50 -07:00
WeichenXu a745381b9d [SPARK-25382][SQL][PYSPARK] Remove ImageSchema.readImages in 3.0
## What changes were proposed in this pull request?

I remove the deprecate `ImageSchema.readImages`.
Move some useful methods from class `ImageSchema` into class `ImageFileFormat`.

In pyspark, I rename `ImageSchema` class to be `ImageUtils`, and keep some useful python methods in it.

## How was this patch tested?

UT.

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #25245 from WeichenXu123/remove_image_schema.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-31 14:26:18 +09:00
gengjiaan dba4375359 [MINOR][CORE][DOCS] Fix inconsistent description of showConsoleProgress
## What changes were proposed in this pull request?

The latest docs http://spark.apache.org/docs/latest/configuration.html contains some description as below:

spark.ui.showConsoleProgress | true | Show the progress bar in the console. The progress bar shows the progress of stages that run for longer than 500ms. If multiple stages run at the same time, multiple progress bars will be displayed on the same line.
-- | -- | --

But the class `org.apache.spark.internal.config.UI` define the config `spark.ui.showConsoleProgress` as below:
```
val UI_SHOW_CONSOLE_PROGRESS = ConfigBuilder("spark.ui.showConsoleProgress")
    .doc("When true, show the progress bar in the console.")
    .booleanConf
    .createWithDefault(false)
```
So I think there are exists some little mistake and lead to confuse reader.

## How was this patch tested?

No need UT.

Closes #25297 from beliefer/inconsistent-desc-showConsoleProgress.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-31 12:17:44 +09:00
Yuming Wang 261e113449 [SPARK-28038][SQL][TEST] Port text.sql
## What changes were proposed in this pull request?

This PR is to port text.sql from PostgreSQL regression tests. https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/text.sql

The expected results can be found in the link: https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/expected/text.out

When porting the test cases, found a PostgreSQL specific features that do not exist in Spark SQL:
[SPARK-28037](https://issues.apache.org/jira/browse/SPARK-28037): Add built-in String Functions: quote_literal

Also, found three inconsistent behavior:
[SPARK-27930](https://issues.apache.org/jira/browse/SPARK-27930): Spark SQL's format_string can not fully support PostgreSQL's format
[SPARK-28036](https://issues.apache.org/jira/browse/SPARK-28036):  Built-in udf left/right has inconsistent behavior
[SPARK-28033](https://issues.apache.org/jira/browse/SPARK-28033): String concatenation should low priority than other operators

## How was this patch tested?

N/A

Closes #24862 from wangyum/SPARK-28038.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-31 11:36:26 +09:00
WeichenXu 3b14088541 [SPARK-26175][PYTHON] Redirect the standard input of the forked child to devnull in daemon
## What changes were proposed in this pull request?

PySpark worker daemon reads from stdin the worker PIDs to kill. 1bb60ab839/python/pyspark/daemon.py (L127)

However, the worker process is a forked process from the worker daemon process and we didn't close stdin on the child after fork. This means the child and user program can read stdin as well, which blocks daemon from receiving the PID to kill. This can cause issues because the task reaper might detect the task was not terminated and eventually kill the JVM.

This PR fix this by redirecting the standard input of the forked child to devnull.

## How was this patch tested?

Manually test.

In `pyspark`, run:
```
import subprocess
def task(_):
  subprocess.check_output(["cat"])

sc.parallelize(range(1), 1).mapPartitions(task).count()
```

Before:
The job will get stuck and press Ctrl+C to exit the job but the python worker process do not exit.
After:
The job finish correctly. The "cat" print nothing (because the dummay stdin is "/dev/null").
The python worker process exit normally.

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #25138 from WeichenXu123/SPARK-26175.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-31 09:10:24 +09:00
Dilip Biswal ee3c1c777d [SPARK-28375][SQL] Make pullupCorrelatedPredicate idempotent
## What changes were proposed in this pull request?

This PR makes the optimizer rule PullupCorrelatedPredicates idempotent.
## How was this patch tested?

A new test PullupCorrelatedPredicatesSuite

Closes #25268 from dilipbiswal/pr-25164.

Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-07-30 16:29:24 -07:00
mcheah abef84a868 [SPARK-28209][CORE][SHUFFLE] Proposed new shuffle writer API
## What changes were proposed in this pull request?

As part of the shuffle storage API proposed in SPARK-25299, this introduces an API for persisting shuffle data in arbitrary storage systems.

This patch introduces several concepts:
* `ShuffleDataIO`, which is the root of the entire plugin tree that will be proposed over the course of the shuffle API project.
* `ShuffleExecutorComponents` - the subset of plugins for managing shuffle-related components for each executor. This will in turn instantiate shuffle readers and writers.
* `ShuffleMapOutputWriter` interface - instantiated once per map task. This provides child `ShufflePartitionWriter` instances for persisting the bytes for each partition in the map task.

The default implementation of these plugins exactly mirror what was done by the existing shuffle writing code - namely, writing the data to local disk and writing an index file. We leverage the APIs in the `BypassMergeSortShuffleWriter` only. Follow-up PRs will use the APIs in `SortShuffleWriter` and `UnsafeShuffleWriter`, but are left as future work to minimize the review surface area.

## How was this patch tested?

New unit tests were added. Micro-benchmarks indicate there's no slowdown in the affected code paths.

Closes #25007 from mccheah/spark-shuffle-writer-refactor.

Lead-authored-by: mcheah <mcheah@palantir.com>
Co-authored-by: mccheah <mcheah@palantir.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-07-30 14:17:30 -07:00
Kousuke Saruta 121f9338ce [SPARK-28525][DEPLOY] Allow Launcher to be applied Java options
Launcher is implemented as a Java application and sometimes I'd like to apply Java options.
One situation I have met is the time I try to attach debugger to Launcher.

Launcher is launched from bin/spark-class but there is no room to apply Java options.

```
build_command() {
  "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$"
  printf "%d\0" $?
}
```

Considering that it's not so many times to apply Java options to Launcher,  one compromise would just modify spark-class by user like as follows.

```
build_command() {
  "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$"
  printf "%d\0" $?
}
```

But it doesn't work when any text related to Java options is output to standard output because whole output is used as command-string for spark-shell and spark-submit in current implementation.

One example is jdwp. When apply agentlib option to use jdwp for debug, we will get output like as follows.

```
Listening for transport dt_socket at address: 9876
```

The output shown above is not a command-string so spark-submit and spark-shell will fail.
To enable Java options for Launcher, we need treat command-string and others.

I changed launcher/Main.java and bin/spark-class to print separator-character and treat it.

## How was this patch tested?

Tested manually using Spark Shell with / without LAUNCHER_JAVA_OPTIONS like as follows.

```
SPARK_LAUNCHER_OPTS="-agentlib:jdwp=transport=dt_socket,suspend=y,address=localhost:9876,server=y" bin/spark-shell
```

Closes #25265 from sarutak/add-spark-launcher-opts.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-07-30 12:45:32 -07:00
zhengruifeng 44c28d7515 [SPARK-28399][ML][PYTHON] implement RobustScaler
## What changes were proposed in this pull request?
Implement `RobustScaler`
Since the transformation is quite similar to `StandardScaler`, I refactor the transform function so that it can be reused in both scalers.

## How was this patch tested?
existing and added tests

Closes #25160 from zhengruifeng/robust_scaler.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-07-30 10:24:33 -05:00
pgandhi 70910e6ad0 [SPARK-26755][SCHEDULER] : Optimize Spark Scheduler to dequeue speculative tasks…
… more efficiently

This PR improves the performance of scheduling speculative tasks to be O(1) instead of O(numSpeculativeTasks), using the same approach used for scheduling regular tasks. The performance of this method is particularly important because a lock is held on the TaskSchedulerImpl which is a bottleneck for all scheduling operations. We ran a Join query on a large dataset with speculation enabled and out of 100000 tasks for the ShuffleMapStage, the maximum number of speculatable tasks that was noted was close to 7900-8000 at a point. That is when we start seeing the bottleneck on the scheduler lock.

In particular, this works by storing a separate stack of tasks by executor, node, and rack locality preferences. Then when trying to schedule a speculative task, rather than scanning all speculative tasks to find ones which match the given executor (or node, or rack) preference, we can jump to a quick check of tasks matching the resource offer. This technique was already used for regular tasks -- this change refactors the code to allow sharing with regular and speculative task execution.

## What changes were proposed in this pull request?

Have split the main queue "speculatableTasks" into 5 separate queues based on locality preference similar to how normal tasks are enqueued. Thus, the "dequeueSpeculativeTask" method will avoid performing locality checks for each task at runtime and simply return the preferable task to be executed.

## How was this patch tested?
We ran a spark job that performed a join on a 10 TB dataset to test the code change.
Original Code:
<img width="1433" alt="screen shot 2019-01-28 at 5 07 22 pm" src="https://user-images.githubusercontent.com/22228190/51873321-572df280-2322-11e9-9149-0aae08d5edc6.png">

Optimized Code:
<img width="1435" alt="screen shot 2019-01-28 at 5 08 19 pm" src="https://user-images.githubusercontent.com/22228190/51873343-6745d200-2322-11e9-947b-2cfd0f06bcab.png">

As you can see, the run time of the ShuffleMapStage came down from 40 min to 6 min approximately, thus, reducing the overall running time of the spark job by a significant amount.

Another example for the same job:

Original Code:
<img width="1440" alt="screen shot 2019-01-28 at 5 11 30 pm" src="https://user-images.githubusercontent.com/22228190/51873355-70cf3a00-2322-11e9-9c3a-af035449a306.png">

Optimized Code:
<img width="1440" alt="screen shot 2019-01-28 at 5 12 16 pm" src="https://user-images.githubusercontent.com/22228190/51873367-7dec2900-2322-11e9-8d07-1b1b49285f71.png">

Closes #23677 from pgandhi999/SPARK-26755.

Lead-authored-by: pgandhi <pgandhi@verizonmedia.com>
Co-authored-by: pgandhi <pgandhi@oath.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
2019-07-30 09:54:51 -05:00
Yuming Wang 2656c9d304 [SPARK-28071][SQL][TEST] Port strings.sql
## What changes were proposed in this pull request?

This PR is to port strings.sql from PostgreSQL regression tests. https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/strings.sql

The expected results can be found in the link: https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/expected/strings.out

When porting the test cases, found nine PostgreSQL specific features that do not exist in Spark SQL:
[SPARK-28076](https://issues.apache.org/jira/browse/SPARK-28076): Support regular expression substring
[SPARK-28078](https://issues.apache.org/jira/browse/SPARK-28078):  Add support other 4 REGEXP functions
[SPARK-28412](https://issues.apache.org/jira/browse/SPARK-28412): OVERLAY function support byte array
[SPARK-28083](https://issues.apache.org/jira/browse/SPARK-28083):  ANSI SQL: LIKE predicate: ESCAPE clause
[SPARK-28087](https://issues.apache.org/jira/browse/SPARK-28087):  Add support split_part
[SPARK-28122](https://issues.apache.org/jira/browse/SPARK-28122): Missing `sha224`/`sha256 `/`sha384 `/`sha512 ` functions
[SPARK-28123](https://issues.apache.org/jira/browse/SPARK-28123): Add support string functions: btrim
[SPARK-28448](https://issues.apache.org/jira/browse/SPARK-28448): Implement ILIKE operator
[SPARK-28449](https://issues.apache.org/jira/browse/SPARK-28449): Missing escape_string_warning and standard_conforming_strings config

Also, found five inconsistent behavior:
[SPARK-27952](https://issues.apache.org/jira/browse/SPARK-27952): String Functions: regexp_replace is not compatible
[SPARK-28121](https://issues.apache.org/jira/browse/SPARK-28121): decode can not accept 'escape' as charset
[SPARK-27930](https://issues.apache.org/jira/browse/SPARK-27930): Replace `strpos` with `locate` or `position` in Spark SQL
[SPARK-27930](https://issues.apache.org/jira/browse/SPARK-27930): Replace `to_hex` with `hex ` or in Spark SQL
[SPARK-28451](https://issues.apache.org/jira/browse/SPARK-28451): `substr` returns different values

## How was this patch tested?

N/A

Closes #24923 from wangyum/SPARK-28071.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2019-07-30 18:54:14 +09:00