Commit graph

24863 commits

Author SHA1 Message Date
Rob Vesse 6ea4a737ea [SPARK-28649][INFRA] Add Python .eggs to .gitignore
## What changes were proposed in this pull request?

If you build Spark distributions you potentially end up with a `python/.eggs` directory in your working copy which is not currently ignored by Spark's `.gitignore` file.  Since these are transient build artifacts there is no reason to ever commit these to Git so this should be placed in the `.gitignore` list

## How was this patch tested?

Verified the offending artifacts were no longer reported as untracked content by Git

Closes #25380 from rvesse/patch-1.

Authored-by: Rob Vesse <rvesse@dotnetrdf.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-07 16:55:11 -07:00
Yuming Wang eeaf1851b2 [SPARK-28617][SQL][TEST] Fix misplacement when comment is at the end of the query
## What changes were proposed in this pull request?

This PR fixes the issue of misplacement when the comment at the end of the query. Example:
Comment for ` SELECT date '5874898-01-01'`:
2d74f14d74/sql/core/src/test/resources/sql-tests/inputs/pgSQL/date.sql (L200)
But the golden file is:
a5a5da78cf/sql/core/src/test/resources/sql-tests/results/pgSQL/date.sql.out (L484-L507)

After this PR:
eeb7405ad0/sql/core/src/test/resources/sql-tests/results/pgSQL/date.sql.out (L482-L501)

## How was this patch tested?

N/A

Closes #25357 from wangyum/SPARK-28617.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-07 16:45:23 -07:00
Gengliang Wang c88df2ccf6 [SPARK-28331][SQL] Catalogs.load() should be able to load built-in catalogs
## What changes were proposed in this pull request?

In `Catalogs.load`, the `pluginClassName` in the following code
```
String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null);
```
is always null for built-in catalogs, e.g there is a SQLConf entry `spark.sql.catalog.session`.

This is because of https://github.com/apache/spark/pull/18852: SQLConf.conf.getConfString(key, null) always returns null.

## How was this patch tested?

Apply code changes of https://github.com/apache/spark/pull/24768 and tried loading session catalog.

Closes #25094 from gengliangwang/fixCatalogLoad.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Burak Yavuz <brkyvz@gmail.com>
2019-08-07 16:14:34 -07:00
Marco Gaido 8617bf6ff8 [SPARK-28470][SQL] Cast to decimal throws ArithmeticException on overflow
## What changes were proposed in this pull request?

The flag `spark.sql.decimalOperations.nullOnOverflow` is not honored by the `Cast` operator. This means that a casting which causes an overflow currently returns `null`.

The PR makes `Cast` respecting that flag, ie. when it is turned to false and a decimal overflow occurs, an exception id thrown.

## How was this patch tested?

Added UT

Closes #25253 from mgaido91/SPARK-28470.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2019-08-08 08:10:21 +09:00
maryannxue 325bc8e9c6 [SPARK-28583][SQL] Subqueries should not call onUpdatePlan in Adaptive Query Execution
## What changes were proposed in this pull request?

Subqueries do not have their own execution id, thus when calling `AdaptiveSparkPlanExec.onUpdatePlan`, it will actually get the `QueryExecution` instance of the main query, which is wasteful and problematic. It could cause issues like stack overflow or dead locks in some circumstances.

This PR fixes this issue by making `AdaptiveSparkPlanExec` compare the `QueryExecution` object retrieved by current execution ID against the `QueryExecution` object from which this plan is created, and only update the UI when the two instances are the same.

## How was this patch tested?

Manual tests on TPC-DS queries.

Closes #25316 from maryannxue/aqe-updateplan-fix.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: herman <herman@databricks.com>
2019-08-07 22:10:17 +02:00
Yuming Wang a59fdc4b57 [SPARK-28472][SQL][TEST] Add test for thriftserver protocol versions
## What changes were proposed in this pull request?

This pr adds a test(`SparkThriftServerProtocolVersionsSuite`) to test different versions of the thrift protocol because we use different logic to handle the `RowSet`:
02c33694c8/sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/RowSetFactory.java (L28-L40)

When adding this test cases, found three bugs:
[SPARK-26969](https://issues.apache.org/jira/browse/SPARK-26969): Using ODBC not able to see the data in table when datatype is decimal
[SPARK-28463](https://issues.apache.org/jira/browse/SPARK-28463): Thriftserver throws BigDecimal incompatible with HiveDecimal
[SPARK-28474](https://issues.apache.org/jira/browse/SPARK-28474): Lower JDBC client version(Hive 0.12) cannot read binary type

## How was this patch tested?

N/A

Closes #25228 from wangyum/SPARK-28472.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-08-07 08:51:58 -07:00
Wenchen Fan 469423f338 [SPARK-28595][SQL] explain should not trigger partition listing
## What changes were proposed in this pull request?

Sometimes when you explain a query, you will get stuck for a while. What's worse, you will get stuck again if you explain again.

This is caused by `FileSourceScanExec`:
1. In its `toString`, it needs to report the number of partitions it reads. This needs to query the hive metastore.
2. In its `outputOrdering`, it needs to get all the files. This needs to query the hive metastore.

This PR fixes by:
1. `toString` do not need to report the number of partitions it reads. We should report it via SQL metrics.
2. The `outputOrdering` is not very useful. We can only apply it if a) all the bucket columns are read. b) there is only one file in each bucket. This condition is really hard to meet, and even if we meet, sorting an already sorted file is pretty fast and avoiding the sort is not that useful. I think it's worth to give up this optimization so that explain don't need to get stuck.

## How was this patch tested?

existing tests

Closes #25328 from cloud-fan/ui.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-07 19:14:25 +08:00
gengjiaan 99de6a4240 [SPARK-27924][SQL][TEST][FOLLOW-UP] Enable Boolean-Predicate syntax tests
## What changes were proposed in this pull request?

This PR is a follow-up to https://github.com/apache/spark/pull/25074

## How was this patch tested?

Pass the Jenkins with the newly update test files.

Closes #25366 from beliefer/uncomment-boolean-test.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-07 00:34:49 -07:00
mcheah 44e607e921 [SPARK-28238][SQL] Implement DESCRIBE TABLE for Data Source V2 Tables
## What changes were proposed in this pull request?

Implements the `DESCRIBE TABLE` logical and physical plans for data source v2 tables.

## How was this patch tested?

Added unit tests to `DataSourceV2SQLSuite`.

Closes #25040 from mccheah/describe-table-v2.

Authored-by: mcheah <mcheah@palantir.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-07 14:26:45 +08:00
WeichenXu a133175ffa [SPARK-28615][SQL][DOCS] Add a guide line for dataframe functions to say column signature function is by default
## What changes were proposed in this pull request?

Add a guide line for dataframe functions, say:
```
This function APIs usually have methods with Column signature only because it can support not only Column but also other types such as a native string. The other variants currently exist for historical reasons.
```

## How was this patch tested?

N/A

Closes #25355 from WeichenXu123/update_functions_guide2.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-07 10:39:47 +09:00
Nik Vanderhoof 9e931e787d [SPARK-27905][SQL] Add higher order function 'forall'
## What changes were proposed in this pull request?

Add's the higher order function `forall`, which tests an array to see if a predicate holds for every element.
The function is implemented in `org.apache.spark.sql.catalyst.expressions.ArrayForAll`.
The function is added to the function registry under the pretty name `forall`.

## How was this patch tested?

I've added appropriate unit tests for the new ArrayForAll expression in
`sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala`.

Also added tests for the function in `sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala`.

Not sure who is best to ask about this PR so:
 HyukjinKwon rxin gatorsmile ueshin srowen hvanhovell gatorsmile

Closes #24761 from nvander1/feature/for_all.

Lead-authored-by: Nik Vanderhoof <nikolasrvanderhoof@gmail.com>
Co-authored-by: Nik <nikolasrvanderhoof@gmail.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
2019-08-06 14:25:53 -07:00
Maxim Gekk 9e3aab8b95 [SPARK-28623][SQL] Support dow, isodow and doy by extract()
## What changes were proposed in this pull request?

In the PR, I propose to use existing expressions `DayOfYear`, `WeekDay` and `DayOfWeek`, and support additional parameters of `extract()` for feature parity with PostgreSQL (https://www.postgresql.org/docs/11/functions-datetime.html#FUNCTIONS-DATETIME-EXTRACT):

1. `dow` - the day of the week as Sunday (0) to Saturday (6)
2. `isodow` - the day of the week as Monday (1) to Sunday (7)
3. `doy` - the day of the year (1 - 365/366)

Here are examples:
```sql
spark-sql> SELECT EXTRACT(DOW FROM TIMESTAMP '2001-02-16 20:38:40');
5
spark-sql> SELECT EXTRACT(ISODOW FROM TIMESTAMP '2001-02-18 20:38:40');
7
spark-sql> SELECT EXTRACT(DOY FROM TIMESTAMP '2001-02-16 20:38:40');
47
```

## How was this patch tested?

Updated `extract.sql`.

Closes #25367 from MaxGekk/extract-ext.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-06 13:39:49 -07:00
zhengruifeng c17fa1360d [SPARK-28514][ML] Remove the redundant transformImpl method in RF & GBT
## What changes were proposed in this pull request?
Remove the redundant and confusing transformImpl method in RF & GBT;
1, In `GBTClassifier` & `RandomForestClassifier`, the real `transform` methods inherit from `ProbabilisticClassificationModel` which can deal with multi output columns.
The `transformImpl` method, which deals with only one column - `predictionCol`, completely does nothing. This is quite confusing.

2, In `GBTRegressor` & `RandomForestRegressor`, the `transformImpl` do exactly what the superclass `PredictionModel` does (except model broadcasting), so can be removed.

## How was this patch tested?
existing suites

Closes #25256 from zhengruifeng/del_ensamble_transformImpl.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-06 15:12:47 -05:00
HyukjinKwon bab88c48b1 [SPARK-28622][SQL][PYTHON] Rename PullOutPythonUDFInJoinCondition to ExtractPythonUDFFromJoinCondition and move to 'Extract Python UDFs'
## What changes were proposed in this pull request?

This PR targets to rename `PullOutPythonUDFInJoinCondition` to `ExtractPythonUDFFromJoinCondition` and move to 'Extract Python UDFs' together with other Python UDF related rules.

Currently `PullOutPythonUDFInJoinCondition` rule is alone outside of other 'Extract Python UDFs' rules together.

and the name `ExtractPythonUDFFromJoinCondition` is matched to existing Python UDF extraction rules.

## How was this patch tested?

Existing tests should cover.

Closes #25358 from HyukjinKwon/move-python-join-rule.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-08-05 23:36:35 -07:00
Udbhav30 150dbc5dc2 [SPARK-28391][PYTHON][SQL][TESTS][FOLLOW-UP] Add UDF cases into groupby clause in 'pgSQL/select_implicit.sql'
## What changes were proposed in this pull request?
This PR adds UDF cases into group by clause in 'pgSQL/select_implicit.sql'

<details><summary>Diff comparing to 'pgSQL/select_implicit.sql'</summary>
<p>

```diff
diff --git a/home/root1/src/spark/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-select_implicit.sql.out b/home/root1/src/spark/sql/core/src/test/resources/sql-tests/results/pgSQL/select_implicit.sql.out
index 17303b2..0675820 100755
--- a/home/root1/src/spark/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-select_implicit.sql.out
+++ b/home/root1/src/spark/sql/core/src/test/resources/sql-tests/results/pgSQL/select_implicit.sql.out
 -91,11 +91,9  struct<>

 -- !query 11
-SELECT udf(c), udf(count(*)) FROM test_missing_target GROUP BY
-udf(test_missing_target.c)
-ORDER BY udf(c)
+SELECT c, count(*) FROM test_missing_target GROUP BY test_missing_target.c ORDER BY c
 -- !query 11 schema
-struct<CAST(udf(cast(c as string)) AS STRING):string,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<c:string,count(1):bigint>
 -- !query 11 output
 ABAB	2
 BBBB	2
 -106,10 +104,9  cccc	2

 -- !query 12
-SELECT udf(count(*)) FROM test_missing_target GROUP BY udf(test_missing_target.c)
-ORDER BY udf(c)
+SELECT count(*) FROM test_missing_target GROUP BY test_missing_target.c ORDER BY c
 -- !query 12 schema
-struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<count(1):bigint>
 -- !query 12 output
 2
 2
 -120,18 +117,18  struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>

 -- !query 13
-SELECT udf(count(*)) FROM test_missing_target GROUP BY udf(a) ORDER BY udf(b)
+SELECT count(*) FROM test_missing_target GROUP BY a ORDER BY b
 -- !query 13 schema
 struct<>
 -- !query 13 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`b`' given input columns: [CAST(udf(cast(count(1) as string)) AS BIGINT)]; line 1 pos 75
+cannot resolve '`b`' given input columns: [count(1)]; line 1 pos 61

 -- !query 14
-SELECT udf(count(*)) FROM test_missing_target GROUP BY udf(b) ORDER BY udf(b)
+SELECT count(*) FROM test_missing_target GROUP BY b ORDER BY b
 -- !query 14 schema
-struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<count(1):bigint>
 -- !query 14 output
 1
 2
 -140,10 +137,10  struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>

 -- !query 15
-SELECT udf(test_missing_target.b), udf(count(*))
-  FROM test_missing_target GROUP BY udf(b) ORDER BY udf(b)
+SELECT test_missing_target.b, count(*)
+  FROM test_missing_target GROUP BY b ORDER BY b
 -- !query 15 schema
-struct<CAST(udf(cast(b as string)) AS INT):int,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<b:int,count(1):bigint>
 -- !query 15 output
 1	1
 2	2
 -152,9 +149,9  struct<CAST(udf(cast(b as string)) AS INT):int,CAST(udf(cast(count(1) as string)

 -- !query 16
-SELECT udf(c) FROM test_missing_target ORDER BY udf(a)
+SELECT c FROM test_missing_target ORDER BY a
 -- !query 16 schema
-struct<CAST(udf(cast(c as string)) AS STRING):string>
+struct<c:string>
 -- !query 16 output
 XXXX
 ABAB
 -169,10 +166,9  CCCC

 -- !query 17
-SELECT udf(count(*)) FROM test_missing_target GROUP BY udf(b) ORDER BY udf(b)
-desc
+SELECT count(*) FROM test_missing_target GROUP BY b ORDER BY b desc
 -- !query 17 schema
-struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<count(1):bigint>
 -- !query 17 output
 4
 3
 -181,17 +177,17  struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>

 -- !query 18
-SELECT udf(count(*)) FROM test_missing_target ORDER BY udf(1) desc
+SELECT count(*) FROM test_missing_target ORDER BY 1 desc
 -- !query 18 schema
-struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<count(1):bigint>
 -- !query 18 output
 10

 -- !query 19
-SELECT udf(c), udf(count(*)) FROM test_missing_target GROUP BY 1 ORDER BY 1
+SELECT c, count(*) FROM test_missing_target GROUP BY 1 ORDER BY 1
 -- !query 19 schema
-struct<CAST(udf(cast(c as string)) AS STRING):string,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<c:string,count(1):bigint>
 -- !query 19 output
 ABAB	2
 BBBB	2
 -202,30 +198,30  cccc	2

 -- !query 20
-SELECT udf(c), udf(count(*)) FROM test_missing_target GROUP BY 3
+SELECT c, count(*) FROM test_missing_target GROUP BY 3
 -- !query 20 schema
 struct<>
 -- !query 20 output
 org.apache.spark.sql.AnalysisException
-GROUP BY position 3 is not in select list (valid range is [1, 2]); line 1 pos 63
+GROUP BY position 3 is not in select list (valid range is [1, 2]); line 1 pos 53

 -- !query 21
-SELECT udf(count(*)) FROM test_missing_target x, test_missing_target y
-	WHERE udf(x.a) = udf(y.a)
-	GROUP BY udf(b) ORDER BY udf(b)
+SELECT count(*) FROM test_missing_target x, test_missing_target y
+	WHERE x.a = y.a
+	GROUP BY b ORDER BY b
 -- !query 21 schema
 struct<>
 -- !query 21 output
 org.apache.spark.sql.AnalysisException
-Reference 'b' is ambiguous, could be: x.b, y.b.; line 3 pos 14
+Reference 'b' is ambiguous, could be: x.b, y.b.; line 3 pos 10

 -- !query 22
-SELECT udf(a), udf(a) FROM test_missing_target
-	ORDER BY udf(a)
+SELECT a, a FROM test_missing_target
+	ORDER BY a
 -- !query 22 schema
-struct<CAST(udf(cast(a as string)) AS INT):int,CAST(udf(cast(a as string)) AS INT):int>
+struct<a:int,a:int>
 -- !query 22 output
 0	0
 1	1
 -240,10 +236,10  struct<CAST(udf(cast(a as string)) AS INT):int,CAST(udf(cast(a as string)) AS IN

 -- !query 23
-SELECT udf(udf(a)/2), udf(udf(a)/2) FROM test_missing_target
-	ORDER BY udf(udf(a)/2)
+SELECT a/2, a/2 FROM test_missing_target
+	ORDER BY a/2
 -- !query 23 schema
-struct<CAST(udf(cast((cast(udf(cast(a as string)) as int) div 2) as string)) AS INT):int,CAST(udf(cast((cast(udf(cast(a as string)) as int) div 2) as string)) AS INT):int>
+struct<(a div 2):int,(a div 2):int>
 -- !query 23 output
 0	0
 0	0
 -258,10 +254,10  struct<CAST(udf(cast((cast(udf(cast(a as string)) as int) div 2) as string)) AS

 -- !query 24
-SELECT udf(a/2), udf(a/2) FROM test_missing_target
-	GROUP BY udf(a/2) ORDER BY udf(a/2)
+SELECT a/2, a/2 FROM test_missing_target
+	GROUP BY a/2 ORDER BY a/2
 -- !query 24 schema
-struct<CAST(udf(cast((a div 2) as string)) AS INT):int,CAST(udf(cast((a div 2) as string)) AS INT):int>
+struct<(a div 2):int,(a div 2):int>
 -- !query 24 output
 0	0
 1	1
 -271,11 +267,11  struct<CAST(udf(cast((a div 2) as string)) AS INT):int,CAST(udf(cast((a div 2) a

 -- !query 25
-SELECT udf(x.b), udf(count(*)) FROM test_missing_target x, test_missing_target y
-	WHERE udf(x.a) = udf(y.a)
-	GROUP BY udf(x.b) ORDER BY udf(x.b)
+SELECT x.b, count(*) FROM test_missing_target x, test_missing_target y
+	WHERE x.a = y.a
+	GROUP BY x.b ORDER BY x.b
 -- !query 25 schema
-struct<CAST(udf(cast(b as string)) AS INT):int,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<b:int,count(1):bigint>
 -- !query 25 output
 1	1
 2	2
 -284,11 +280,11  struct<CAST(udf(cast(b as string)) AS INT):int,CAST(udf(cast(count(1) as string)

 -- !query 26
-SELECT udf(count(*)) FROM test_missing_target x, test_missing_target y
-	WHERE udf(x.a) = udf(y.a)
-	GROUP BY udf(x.b) ORDER BY udf(x.b)
+SELECT count(*) FROM test_missing_target x, test_missing_target y
+	WHERE x.a = y.a
+	GROUP BY x.b ORDER BY x.b
 -- !query 26 schema
-struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<count(1):bigint>
 -- !query 26 output
 1
 2
 -297,22 +293,22  struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>

 -- !query 27
-SELECT udf(a%2), udf(count(udf(b))) FROM test_missing_target
-GROUP BY udf(test_missing_target.a%2)
-ORDER BY udf(test_missing_target.a%2)
+SELECT a%2, count(b) FROM test_missing_target
+GROUP BY test_missing_target.a%2
+ORDER BY test_missing_target.a%2
 -- !query 27 schema
-struct<CAST(udf(cast((a % 2) as string)) AS INT):int,CAST(udf(cast(count(cast(udf(cast(b as string)) as int)) as string)) AS BIGINT):bigint>
+struct<(a % 2):int,count(b):bigint>
 -- !query 27 output
 0	5
 1	5

 -- !query 28
-SELECT udf(count(c)) FROM test_missing_target
-GROUP BY udf(lower(test_missing_target.c))
-ORDER BY udf(lower(test_missing_target.c))
+SELECT count(c) FROM test_missing_target
+GROUP BY lower(test_missing_target.c)
+ORDER BY lower(test_missing_target.c)
 -- !query 28 schema
-struct<CAST(udf(cast(count(c) as string)) AS BIGINT):bigint>
+struct<count(c):bigint>
 -- !query 28 output
 2
 3
 -321,18 +317,18  struct<CAST(udf(cast(count(c) as string)) AS BIGINT):bigint>

 -- !query 29
-SELECT udf(count(udf(a))) FROM test_missing_target GROUP BY udf(a) ORDER BY udf(b)
+SELECT count(a) FROM test_missing_target GROUP BY a ORDER BY b
 -- !query 29 schema
 struct<>
 -- !query 29 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`b`' given input columns: [CAST(udf(cast(count(cast(udf(cast(a as string)) as int)) as string)) AS BIGINT)]; line 1 pos 80
+cannot resolve '`b`' given input columns: [count(a)]; line 1 pos 61

 -- !query 30
-SELECT udf(count(b)) FROM test_missing_target GROUP BY udf(b/2) ORDER BY udf(b/2)
+SELECT count(b) FROM test_missing_target GROUP BY b/2 ORDER BY b/2
 -- !query 30 schema
-struct<CAST(udf(cast(count(b) as string)) AS BIGINT):bigint>
+struct<count(b):bigint>
 -- !query 30 output
 1
 5
 -340,10 +336,10  struct<CAST(udf(cast(count(b) as string)) AS BIGINT):bigint>

 -- !query 31
-SELECT udf(lower(test_missing_target.c)), udf(count(udf(c)))
-  FROM test_missing_target GROUP BY udf(lower(c)) ORDER BY udf(lower(c))
+SELECT lower(test_missing_target.c), count(c)
+  FROM test_missing_target GROUP BY lower(c) ORDER BY lower(c)
 -- !query 31 schema
-struct<CAST(udf(cast(lower(c) as string)) AS STRING):string,CAST(udf(cast(count(cast(udf(cast(c as string)) as string)) as string)) AS BIGINT):bigint>
+struct<lower(c):string,count(c):bigint>
 -- !query 31 output
 abab	2
 bbbb	3
 -352,9 +348,9  xxxx	1

 -- !query 32
-SELECT udf(a) FROM test_missing_target ORDER BY udf(upper(udf(d)))
+SELECT a FROM test_missing_target ORDER BY upper(d)
 -- !query 32 schema
-struct<CAST(udf(cast(a as string)) AS INT):int>
+struct<a:int>
 -- !query 32 output
 0
 1
 -369,33 +365,32  struct<CAST(udf(cast(a as string)) AS INT):int>

 -- !query 33
-SELECT udf(count(b)) FROM test_missing_target
-	GROUP BY udf((b + 1) / 2) ORDER BY udf((b + 1) / 2) desc
+SELECT count(b) FROM test_missing_target
+	GROUP BY (b + 1) / 2 ORDER BY (b + 1) / 2 desc
 -- !query 33 schema
-struct<CAST(udf(cast(count(b) as string)) AS BIGINT):bigint>
+struct<count(b):bigint>
 -- !query 33 output
 7
 3

 -- !query 34
-SELECT udf(count(udf(x.a))) FROM test_missing_target x, test_missing_target y
-	WHERE udf(x.a) = udf(y.a)
-	GROUP BY udf(b/2) ORDER BY udf(b/2)
+SELECT count(x.a) FROM test_missing_target x, test_missing_target y
+	WHERE x.a = y.a
+	GROUP BY b/2 ORDER BY b/2
 -- !query 34 schema
 struct<>
 -- !query 34 output
 org.apache.spark.sql.AnalysisException
-Reference 'b' is ambiguous, could be: x.b, y.b.; line 3 pos 14
+Reference 'b' is ambiguous, could be: x.b, y.b.; line 3 pos 10

 -- !query 35
-SELECT udf(x.b/2), udf(count(udf(x.b))) FROM test_missing_target x,
-test_missing_target y
-	WHERE udf(x.a) = udf(y.a)
-	GROUP BY udf(x.b/2) ORDER BY udf(x.b/2)
+SELECT x.b/2, count(x.b) FROM test_missing_target x, test_missing_target y
+	WHERE x.a = y.a
+	GROUP BY x.b/2 ORDER BY x.b/2
 -- !query 35 schema
-struct<CAST(udf(cast((b div 2) as string)) AS INT):int,CAST(udf(cast(count(cast(udf(cast(b as string)) as int)) as string)) AS BIGINT):bigint>
+struct<(b div 2):int,count(b):bigint>
 -- !query 35 output
 0	1
 1	5
 -403,14 +398,14  struct<CAST(udf(cast((b div 2) as string)) AS INT):int,CAST(udf(cast(count(cast(

 -- !query 36
-SELECT udf(count(udf(b))) FROM test_missing_target x, test_missing_target y
-	WHERE udf(x.a) = udf(y.a)
-	GROUP BY udf(x.b/2)
+SELECT count(b) FROM test_missing_target x, test_missing_target y
+	WHERE x.a = y.a
+	GROUP BY x.b/2
 -- !query 36 schema
 struct<>
 -- !query 36 output
 org.apache.spark.sql.AnalysisException
-Reference 'b' is ambiguous, could be: x.b, y.b.; line 1 pos 21
+Reference 'b' is ambiguous, could be: x.b, y.b.; line 1 pos 13

 -- !query 37
```

</p>
</details>

## How was this patch tested?
Tested as Guided in SPARK-27921

Closes #25350 from Udbhav30/master.

Authored-by: Udbhav30 <u.agrawal30@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-06 15:14:32 +09:00
HyukjinKwon da3d4b6a35 [SPARK-28537][SQL][HOTFIX][FOLLOW-UP] Add supportColumnar in DebugExec
## What changes were proposed in this pull request?

This PR add supportColumnar in DebugExec. Seems there was a conflict between https://github.com/apache/spark/pull/25274 and https://github.com/apache/spark/pull/25264

Currently tests are broken in Jenkins:

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/108687/
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/108688/
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/108693/

```
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree: ColumnarToRow +- InMemoryTableScan [id#356956L]       +- InMemoryRelation [id#356956L], StorageLevel(disk, memory, deserialized, 1 replicas)             +- *(1) Range (0, 5, step=1, splits=2)
Stacktrace
sbt.ForkMain$ForkError: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
ColumnarToRow
+- InMemoryTableScan [id#356956L]
      +- InMemoryRelation [id#356956L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Range (0, 5, step=1, splits=2)

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:431)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:323)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)
```

## How was this patch tested?

Manually tested the failed test.

Closes #25365 from HyukjinKwon/SPARK-28537.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-06 15:08:15 +09:00
Stavros Kontopoulos 4a2c662315 [SPARK-27921][PYTHON][SQL][TESTS][FOLLOW-UP] Add UDF cases into group by clause in 'udf-group-analytics.sql'
## What changes were proposed in this pull request?

This PR is a followup of a fix as described in here: #25215 (comment)
<details><summary>Diff comparing to 'group-analytics.sql'</summary>
<p>

```diff
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out
index 3439a05727..de297ab166 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out
 -13,9 +13,9  struct<>

 -- !query 1
-SELECT a + b, b, SUM(a - b) FROM testData GROUP BY a + b, b WITH CUBE
+SELECT udf(a + b), b, udf(SUM(a - b)) FROM testData GROUP BY udf(a + b), b WITH CUBE
 -- !query 1 schema
-struct<(a + b):int,b:int,sum((a - b)):bigint>
+struct<CAST(udf(cast((a + b) as string)) AS INT):int,b:int,CAST(udf(cast(sum(cast((a - b) as bigint)) as string)) AS BIGINT):bigint>
 -- !query 1 output
 2	1	0
 2	NULL	0
 -33,9 +33,9  NULL	NULL	3

 -- !query 2
-SELECT a, b, SUM(b) FROM testData GROUP BY a, b WITH CUBE
+SELECT udf(a), udf(b), SUM(b) FROM testData GROUP BY udf(a), b WITH CUBE
 -- !query 2 schema
-struct<a:int,b:int,sum(b):bigint>
+struct<CAST(udf(cast(a as string)) AS INT):int,CAST(udf(cast(b as string)) AS INT):int,sum(b):bigint>
 -- !query 2 output
 1	1	1
 1	2	2
 -52,9 +52,9  NULL	NULL	9

 -- !query 3
-SELECT a + b, b, SUM(a - b) FROM testData GROUP BY a + b, b WITH ROLLUP
+SELECT udf(a + b), b, SUM(a - b) FROM testData GROUP BY a + b, b WITH ROLLUP
 -- !query 3 schema
-struct<(a + b):int,b:int,sum((a - b)):bigint>
+struct<CAST(udf(cast((a + b) as string)) AS INT):int,b:int,sum((a - b)):bigint>
 -- !query 3 output
 2	1	0
 2	NULL	0
 -70,9 +70,9  NULL	NULL	3

 -- !query 4
-SELECT a, b, SUM(b) FROM testData GROUP BY a, b WITH ROLLUP
+SELECT udf(a), b, udf(SUM(b)) FROM testData GROUP BY udf(a), b WITH ROLLUP
 -- !query 4 schema
-struct<a:int,b:int,sum(b):bigint>
+struct<CAST(udf(cast(a as string)) AS INT):int,b:int,CAST(udf(cast(sum(cast(b as bigint)) as string)) AS BIGINT):bigint>
 -- !query 4 output
 1	1	1
 1	2	2
 -97,7 +97,7  struct<>

 -- !query 6
-SELECT course, year, SUM(earnings) FROM courseSales GROUP BY ROLLUP(course, year) ORDER BY course, year
+SELECT course, year, SUM(earnings) FROM courseSales GROUP BY ROLLUP(course, year) ORDER BY udf(course), year
 -- !query 6 schema
 struct<course:string,year:int,sum(earnings):bigint>
 -- !query 6 output
 -111,7 +111,7  dotNET	2013	48000

 -- !query 7
-SELECT course, year, SUM(earnings) FROM courseSales GROUP BY CUBE(course, year) ORDER BY course, year
+SELECT course, year, SUM(earnings) FROM courseSales GROUP BY CUBE(course, year) ORDER BY course, udf(year)
 -- !query 7 schema
 struct<course:string,year:int,sum(earnings):bigint>
 -- !query 7 output
 -127,9 +127,9  dotNET	2013	48000

 -- !query 8
-SELECT course, year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course, year)
+SELECT course, udf(year), SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course, year)
 -- !query 8 schema
-struct<course:string,year:int,sum(earnings):bigint>
+struct<course:string,CAST(udf(cast(year as string)) AS INT):int,sum(earnings):bigint>
 -- !query 8 output
 Java	NULL	50000
 NULL	2012	35000
 -138,26 +138,26  dotNET	NULL	63000

 -- !query 9
-SELECT course, year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course)
+SELECT course, year, udf(SUM(earnings)) FROM courseSales GROUP BY course, year GROUPING SETS(course)
 -- !query 9 schema
-struct<course:string,year:int,sum(earnings):bigint>
+struct<course:string,year:int,CAST(udf(cast(sum(cast(earnings as bigint)) as string)) AS BIGINT):bigint>
 -- !query 9 output
 Java	NULL	50000
 dotNET	NULL	63000

 -- !query 10
-SELECT course, year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(year)
+SELECT udf(course), year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(year)
 -- !query 10 schema
-struct<course:string,year:int,sum(earnings):bigint>
+struct<CAST(udf(cast(course as string)) AS STRING):string,year:int,sum(earnings):bigint>
 -- !query 10 output
 NULL	2012	35000
 NULL	2013	78000

 -- !query 11
-SELECT course, SUM(earnings) AS sum FROM courseSales
-GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, sum
+SELECT course, udf(SUM(earnings)) AS sum FROM courseSales
+GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, udf(sum)
 -- !query 11 schema
 struct<course:string,sum:bigint>
 -- !query 11 output
 -173,7 +173,7  dotNET	63000

 -- !query 12
 SELECT course, SUM(earnings) AS sum, GROUPING_ID(course, earnings) FROM courseSales
-GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, sum
+GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY udf(course), sum
 -- !query 12 schema
 struct<course:string,sum:bigint,grouping_id(course, earnings):int>
 -- !query 12 output
 -188,10 +188,10  dotNET	63000	1

 -- !query 13
-SELECT course, year, GROUPING(course), GROUPING(year), GROUPING_ID(course, year) FROM courseSales
+SELECT udf(course), udf(year), GROUPING(course), GROUPING(year), GROUPING_ID(course, year) FROM courseSales
 GROUP BY CUBE(course, year)
 -- !query 13 schema
-struct<course:string,year:int,grouping(course):tinyint,grouping(year):tinyint,grouping_id(course, year):int>
+struct<CAST(udf(cast(course as string)) AS STRING):string,CAST(udf(cast(year as string)) AS INT):int,grouping(course):tinyint,grouping(year):tinyint,grouping_id(course, year):int>
 -- !query 13 output
 Java	2012	0	0	0
 Java	2013	0	0	0
 -205,7 +205,7  dotNET	NULL	0	1	1

 -- !query 14
-SELECT course, year, GROUPING(course) FROM courseSales GROUP BY course, year
+SELECT course, udf(year), GROUPING(course) FROM courseSales GROUP BY course, udf(year)
 -- !query 14 schema
 struct<>
 -- !query 14 output
 -214,7 +214,7  grouping() can only be used with GroupingSets/Cube/Rollup;

 -- !query 15
-SELECT course, year, GROUPING_ID(course, year) FROM courseSales GROUP BY course, year
+SELECT course, udf(year), GROUPING_ID(course, year) FROM courseSales GROUP BY udf(course), year
 -- !query 15 schema
 struct<>
 -- !query 15 output
 -223,7 +223,7  grouping_id() can only be used with GroupingSets/Cube/Rollup;

 -- !query 16
-SELECT course, year, grouping__id FROM courseSales GROUP BY CUBE(course, year) ORDER BY grouping__id, course, year
+SELECT course, year, grouping__id FROM courseSales GROUP BY CUBE(course, year) ORDER BY grouping__id, course, udf(year)
 -- !query 16 schema
 struct<course:string,year:int,grouping__id:int>
 -- !query 16 output
 -240,7 +240,7  NULL	NULL	3

 -- !query 17
 SELECT course, year FROM courseSales GROUP BY CUBE(course, year)
-HAVING GROUPING(year) = 1 AND GROUPING_ID(course, year) > 0 ORDER BY course, year
+HAVING GROUPING(year) = 1 AND GROUPING_ID(course, year) > 0 ORDER BY course, udf(year)
 -- !query 17 schema
 struct<course:string,year:int>
 -- !query 17 output
 -250,7 +250,7  dotNET	NULL

 -- !query 18
-SELECT course, year FROM courseSales GROUP BY course, year HAVING GROUPING(course) > 0
+SELECT course, udf(year) FROM courseSales GROUP BY udf(course), year HAVING GROUPING(course) > 0
 -- !query 18 schema
 struct<>
 -- !query 18 output
 -259,7 +259,7  grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup;

 -- !query 19
-SELECT course, year FROM courseSales GROUP BY course, year HAVING GROUPING_ID(course) > 0
+SELECT course, udf(udf(year)) FROM courseSales GROUP BY course, year HAVING GROUPING_ID(course) > 0
 -- !query 19 schema
 struct<>
 -- !query 19 output
 -268,9 +268,9  grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup;

 -- !query 20
-SELECT course, year FROM courseSales GROUP BY CUBE(course, year) HAVING grouping__id > 0
+SELECT udf(course), year FROM courseSales GROUP BY CUBE(course, year) HAVING grouping__id > 0
 -- !query 20 schema
-struct<course:string,year:int>
+struct<CAST(udf(cast(course as string)) AS STRING):string,year:int>
 -- !query 20 output
 Java	NULL
 NULL	2012
 -281,7 +281,7  dotNET	NULL

 -- !query 21
 SELECT course, year, GROUPING(course), GROUPING(year) FROM courseSales GROUP BY CUBE(course, year)
-ORDER BY GROUPING(course), GROUPING(year), course, year
+ORDER BY GROUPING(course), GROUPING(year), course, udf(year)
 -- !query 21 schema
 struct<course:string,year:int,grouping(course):tinyint,grouping(year):tinyint>
 -- !query 21 output
 -298,7 +298,7  NULL	NULL	1	1

 -- !query 22
 SELECT course, year, GROUPING_ID(course, year) FROM courseSales GROUP BY CUBE(course, year)
-ORDER BY GROUPING(course), GROUPING(year), course, year
+ORDER BY GROUPING(course), GROUPING(year), course, udf(year)
 -- !query 22 schema
 struct<course:string,year:int,grouping_id(course, year):int>
 -- !query 22 output
 -314,7 +314,7  NULL	NULL	3

 -- !query 23
-SELECT course, year FROM courseSales GROUP BY course, year ORDER BY GROUPING(course)
+SELECT course, udf(year) FROM courseSales GROUP BY course, udf(year) ORDER BY GROUPING(course)
 -- !query 23 schema
 struct<>
 -- !query 23 output
 -323,7 +323,7  grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup;

 -- !query 24
-SELECT course, year FROM courseSales GROUP BY course, year ORDER BY GROUPING_ID(course)
+SELECT course, udf(year) FROM courseSales GROUP BY course, udf(year) ORDER BY GROUPING_ID(course)
 -- !query 24 schema
 struct<>
 -- !query 24 output
 -332,7 +332,7  grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup;

 -- !query 25
-SELECT course, year FROM courseSales GROUP BY CUBE(course, year) ORDER BY grouping__id, course, year
+SELECT course, year FROM courseSales GROUP BY CUBE(course, year) ORDER BY grouping__id, udf(course), year
 -- !query 25 schema
 struct<course:string,year:int>
 -- !query 25 output
 -348,7 +348,7  NULL	NULL

 -- !query 26
-SELECT a + b AS k1, b AS k2, SUM(a - b) FROM testData GROUP BY CUBE(k1, k2)
+SELECT udf(a + b) AS k1, udf(b) AS k2, SUM(a - b) FROM testData GROUP BY CUBE(k1, k2)
 -- !query 26 schema
 struct<k1:int,k2:int,sum((a - b)):bigint>
 -- !query 26 output
 -368,7 +368,7  NULL	NULL	3

 -- !query 27
-SELECT a + b AS k, b, SUM(a - b) FROM testData GROUP BY ROLLUP(k, b)
+SELECT udf(udf(a + b)) AS k, b, SUM(a - b) FROM testData GROUP BY ROLLUP(k, b)
 -- !query 27 schema
 struct<k:int,b:int,sum((a - b)):bigint>
 -- !query 27 output
 -386,9 +386,9  NULL	NULL	3

 -- !query 28
-SELECT a + b, b AS k, SUM(a - b) FROM testData GROUP BY a + b, k GROUPING SETS(k)
+SELECT udf(a + b), udf(udf(b)) AS k, SUM(a - b) FROM testData GROUP BY a + b, k GROUPING SETS(k)
 -- !query 28 schema
-struct<(a + b):int,k:int,sum((a - b)):bigint>
+struct<CAST(udf(cast((a + b) as string)) AS INT):int,k:int,sum((a - b)):bigint>
 -- !query 28 output
 NULL	1	3
 NULL	2	0

```

</p>
</details>

## How was this patch tested?
Tested as instructed in SPARK-27921.

Closes #25362 from skonto/group-analytics-followup.

Authored-by: Stavros Kontopoulos <st.kontopoulos@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-06 15:00:28 +09:00
Jungtaek Lim (HeartSaVioR) 128ea37bda [SPARK-28601][CORE][SQL] Use StandardCharsets.UTF_8 instead of "UTF-8" string representation, and get rid of UnsupportedEncodingException
## What changes were proposed in this pull request?

This patch tries to keep consistency whenever UTF-8 charset is needed, as using `StandardCharsets.UTF_8` instead of using "UTF-8". If the String type is needed, `StandardCharsets.UTF_8.name()` is used.

This change also brings the benefit of getting rid of `UnsupportedEncodingException`, as we're providing `Charset` instead of `String` whenever possible.

This also changes some private Catalyst helper methods to operate on encodings as `Charset` objects rather than strings.

## How was this patch tested?

Existing unit tests.

Closes #25335 from HeartSaVioR/SPARK-28601.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-05 20:45:54 -07:00
Wenchen Fan 03e3006312 [SPARK-28213][SQL][FOLLOWUP] code cleanup and bug fix for columnar execution framework
## What changes were proposed in this pull request?

I did a post-hoc review of https://github.com/apache/spark/pull/25008 , and would like to propose some cleanups/fixes/improvements:

1. Do not track the scanTime metrics in `ColumnarToRowExec`. This metrics is specific to file scan, and doesn't make sense for a general batch-to-row operator.
2. Because of 2, we need to track scanTime when building RDDs in the file scan node.
3. use `RDD#mapPartitionsInternal` instead of `flatMap` in several places, as `mapPartitionsInternal` is created for Spark SQL and we use it in almost all the SQL operators.
4. Add `limitNotReachedCond` in `ColumnarToRowExec`. This was in the `ColumnarBatchScan` before and is critical for performance.
5. Clear the relationship between codegen stage and columnar stage. The whole-stage-codegen framework is completely row-based, so these 2 kinds of stages can NEVER overlap. When they are adjacent, it's either a `RowToColumnarExec` above `WholeStageExec`, or a `ColumnarToRowExec` above the `InputAdapter`.
6. Reuse the `ColumnarBatch` in `RowToColumnarExec`. We don't need to create a new one every time, just need to reset it.
7. Do not skip testing full scan node in `LogicalPlanTagInSparkPlanSuite`
8. Add back the removed tests in `WholeStageCodegenSuite`.

## How was this patch tested?

existing tests

Closes #25264 from cloud-fan/minor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-06 10:11:18 +08:00
Wenchen Fan 6fb79af48c [SPARK-28344][SQL] detect ambiguous self-join and fail the query
## What changes were proposed in this pull request?

This is an alternative solution of https://github.com/apache/spark/pull/24442 . It fails the query if ambiguous self join is detected, instead of trying to disambiguate it. The problem is that, it's hard to come up with a reasonable rule to disambiguate, the rule proposed by #24442 is mostly a heuristic.

### background of the self-join problem:
This is a long-standing bug and I've seen many people complaining about it in JIRA/dev list.

A typical example:
```
val df1 = …
val df2 = df1.filter(...)
df1.join(df2, df1("a") > df2("a")) // returns empty result
```
The root cause is, `Dataset.apply` is so powerful that users think it returns a column reference which can point to the column of the Dataset at anywhere. This is not true in many cases. `Dataset.apply` returns an `AttributeReference` . Different Datasets may share the same `AttributeReference`. In the example above, `df2` adds a Filter operator above the logical plan of `df1`, and the Filter operator reserves the output `AttributeReference` of its child. This means, `df1("a")` is exactly the same as `df2("a")`, and `df1("a") > df2("a")` always evaluates to false.

### The rule to detect ambiguous column reference caused by self join:
We can reuse the infra in #24442 :
1. each Dataset has a globally unique id.
2. the `AttributeReference` returned by `Dataset.apply` carries the ID and column position(e.g. 3rd column of the Dataset) via metadata.
3. the logical plan of a `Dataset` carries the ID via `TreeNodeTag`

When self-join happens, the analyzer asks the right side plan of join to re-generate output attributes with new exprIds. Based on it, a simple rule to detect ambiguous self join is:
1. find all column references (i.e. `AttributeReference`s with Dataset ID and col position) in the root node of a query plan.
2. for each column reference, traverse the query plan tree, find a sub-plan that carries Dataset ID and the ID is the same as the one in the column reference.
3. get the corresponding output attribute of the sub-plan by the col position in the column reference.
4. if the corresponding output attribute has a different exprID than the column reference, then it means this sub-plan is on the right side of a self-join and has regenerated its output attributes. This is an ambiguous self join because the column reference points to a table being self-joined.

## How was this patch tested?

existing tests and new test cases

Closes #25107 from cloud-fan/new-self-join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-06 10:06:36 +08:00
Kousuke Saruta 794804ea5e [SPARK-28537][SQL] DebugExec cannot debug broadcast or columnar related queries
DebugExec does not implement doExecuteBroadcast and doExecuteColumnar so we can't debug broadcast or columnar related query.

One example for broadcast is here.
```
val df1 = Seq(1, 2, 3).toDF
val df2 = Seq(1, 2, 3).toDF
val joined = df1.join(df2, df1("value") === df2("value"))
joined.debug()

java.lang.UnsupportedOperationException: Debug does not implement doExecuteBroadcast
...
```

Another for columnar is here.
```
val df = Seq(1, 2, 3).toDF
df.persist
df.debug()

java.lang.IllegalStateException: Internal Error class org.apache.spark.sql.execution.debug.package$DebugExec has column support mismatch:
...
```

## How was this patch tested?

Additional test cases in DebuggingSuite.

Closes #25274 from sarutak/fix-debugexec.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2019-08-06 08:26:51 +09:00
wuyi 94499af6f0 [SPARK-28486][CORE][PYTHON] Map PythonBroadcast's data file to a BroadcastBlock to avoid delete by GC
## What changes were proposed in this pull request?

Currently, PythonBroadcast may delete its data file while a python worker still needs it. This happens because PythonBroadcast overrides the `finalize()` method to delete its data file. So, when GC happens and no  references on broadcast variable, it may trigger `finalize()` to delete
data file. That's also means, data under python Broadcast variable couldn't be deleted when `unpersist()`/`destroy()` called but relys on GC.

In this PR, we removed the `finalize()` method, and map the PythonBroadcast data file to a BroadcastBlock(which has the same broadcast id with the broadcast variable who wrapped this PythonBroadcast) when PythonBroadcast is deserializing. As a result, the data file could be deleted just like other pieces of the Broadcast variable when `unpersist()`/`destroy()` called and do not rely on GC any more.

## How was this patch tested?

Added a Python test, and tested manually(verified create/delete the broadcast block).

Closes #25262 from Ngone51/SPARK-28486.

Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-05 20:18:53 +09:00
John Zhuge cae500a255 [SPARK-28178][SQL][FOLLOWUP] DataSourceV2: DataFrameWriter.insertInfo
## What changes were proposed in this pull request?

- DataFrameWriter.insertInto should match column names by position.
- Clean up test cases.

## How was this patch tested?

New tests:
- insertInto: append by position
- insertInto: overwrite partitioned table in static mode by position
- insertInto: overwrite partitioned table in dynamic mode by position

Closes #25353 from jzhuge/SPARK-28178-bypos.

Authored-by: John Zhuge <jzhuge@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-05 16:05:23 +08:00
Yuanjian Li db39f45baf [SPARK-28593][CORE] Rename ShuffleClient to BlockStoreClient which more close to its usage
## What changes were proposed in this pull request?

After SPARK-27677, the shuffle client not only handles the shuffle block but also responsible for local persist RDD blocks. For better code scalability and precise semantics(as the [discussion](https://github.com/apache/spark/pull/24892#discussion_r300173331)), here we did several changes:

- Rename ShuffleClient to BlockStoreClient.
- Correspondingly rename the ExternalShuffleClient to ExternalBlockStoreClient, also change the server-side class from ExternalShuffleBlockHandler to ExternalBlockHandler.
- Move MesosExternalBlockStoreClient to Mesos package.

Note, we still keep the name of BlockTransferService, because the `Service` contains both client and server, also the name of BlockTransferService is not referencing shuffle client only.

## How was this patch tested?

Existing UT.

Closes #25327 from xuanyuanking/SPARK-28593.

Lead-authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Co-authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-05 14:54:45 +08:00
Ryan Blue 0345f1174d [SPARK-27661][SQL] Add SupportsNamespaces API
## What changes were proposed in this pull request?

This adds an interface for catalog plugins that exposes namespace operations:
* `listNamespaces`
* `namespaceExists`
* `loadNamespaceMetadata`
* `createNamespace`
* `alterNamespace`
* `dropNamespace`

## How was this patch tested?

API only. Existing tests for regressions.

Closes #24560 from rdblue/SPARK-27661-add-catalog-namespace-api.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Burak Yavuz <brkyvz@gmail.com>
2019-08-04 21:29:40 -07:00
Dongjoon Hyun ae08387b4c [SPARK-28616][INFRA] Improve merge-spark-pr script to warn WIP PRs and strip trailing dots
## What changes were proposed in this pull request?

This PR aims to improve the `merge-spark-pr` script in the following two ways.
1. `[WIP]` is useful when we show that a PR is not ready for merge. Apache Spark allows merging `WIP` PRs. However, sometime, we accidentally forgot to clean up the title for the completed PRs. We had better warn once more during merging stage and get a confirmation from the committers.
2. We have two kinds of PR titles in terms of the ending period. This PR aims to remove the trailing `dot` since the shorter is the better in the commit title. Also, the PR titles without the trailing `dot` is dominant in the Apache Spark commit logs.
```
$ git log --oneline | grep '[.]$' | wc -l
    4090
$ git log --oneline | grep '[^.]$' | wc -l
   20747
```

## How was this patch tested?

Manual.
```
$ dev/merge_spark_pr.py
git rev-parse --abbrev-ref HEAD
Which pull request would you like to merge? (e.g. 34): 25157

The PR title has `[WIP]`:
[WIP][SPARK-28396][SQL] Add PathCatalog for data source V2
Continue? (y/n):
```

```
$ dev/merge_spark_pr.py
git rev-parse --abbrev-ref HEAD
Which pull request would you like to merge? (e.g. 34): 25304
I've re-written the title as follows to match the standard format:
Original: [SPARK-28570][CORE][SHUFFLE] Make UnsafeShuffleWriter use the new API.
Modified: [SPARK-28570][CORE][SHUFFLE] Make UnsafeShuffleWriter use the new API
Would you like to use the modified title? (y/n):
```

Closes #25356 from dongjoon-hyun/SPARK-28616.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-04 21:23:54 -07:00
Yuming Wang 21a18c6490 [SPARK-28614][SQL][TEST] Do not remove leading write space in the golden result file
## What changes were proposed in this pull request?

It's hard to know if the query needs to be sorted like [`SQLQueryTestSuite.isSorted`](2ecc39c8d3/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala (L375-L380)) when building a test framework for Thriftserver. So we  can sort both  the `outputs` and  the `expectedOutputs. However, we removed leading write space in the golden result file. This can lead to inconsistent results.
This PR makes it does not remove leading write space in the golden result file. Trailing write space still needs to be removed.

## How was this patch tested?

N/A

Closes #25351 from wangyum/SPARK-28614.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-04 17:43:31 -07:00
Sean Owen c09675779b [SPARK-28604][ML] Use log1p(x) over log(1+x) and expm1(x) over exp(x)-1 for accuracy
## What changes were proposed in this pull request?

Use `log1p(x)` over `log(1+x)` and `expm1(x)` over `exp(x)-1` for accuracy, where possible. This should improve accuracy a tiny bit in ML-related calculations, and shouldn't hurt in any event.

## How was this patch tested?

Existing tests.

Closes #25337 from srowen/SPARK-28604.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-04 17:04:01 -05:00
Dongjoon Hyun 4856c0e33a [SPARK-28609][DOC] Fix broken styles/links and make up-to-date
## What changes were proposed in this pull request?

This PR aims to fix the broken styles/links and make the doc up-to-date for Apache Spark 2.4.4 and 3.0.0 release.

- `building-spark.md`
![Screen Shot 2019-08-02 at 10 33 51 PM](https://user-images.githubusercontent.com/9700541/62407962-a248ec80-b575-11e9-8a16-532e9bc421f8.png)

- `configuration.md`
![Screen Shot 2019-08-02 at 10 34 52 PM](https://user-images.githubusercontent.com/9700541/62407969-c7d5f600-b575-11e9-9b1a-a76c6cc095c5.png)

- `sql-pyspark-pandas-with-arrow.md`
![Screen Shot 2019-08-02 at 10 36 14 PM](https://user-images.githubusercontent.com/9700541/62407979-18e5ea00-b576-11e9-99af-7ad9264656ae.png)

- `streaming-programming-guide.md`
![Screen Shot 2019-08-02 at 10 37 11 PM](https://user-images.githubusercontent.com/9700541/62407981-213e2500-b576-11e9-8bc5-a925df7e98a7.png)

- `structured-streaming-programming-guide.md` (1/2)
![Screen Shot 2019-08-02 at 10 38 20 PM](https://user-images.githubusercontent.com/9700541/62408001-49c61f00-b576-11e9-9519-f699775ceecd.png)

- `structured-streaming-programming-guide.md` (2/2)
![Screen Shot 2019-08-02 at 10 40 05 PM](https://user-images.githubusercontent.com/9700541/62408017-7f6b0800-b576-11e9-9341-52664ba6b460.png)

- `submitting-applications.md`
![Screen Shot 2019-08-02 at 10 41 13 PM](https://user-images.githubusercontent.com/9700541/62408027-b2ad9700-b576-11e9-910e-8f22173e1251.png)

## How was this patch tested?

Manual. Build the doc.
```
SKIP_API=1 jekyll build
```

Closes #25345 from dongjoon-hyun/SPARK-28609.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-04 09:42:47 -07:00
WeichenXu b3394db193 [SPARK-28582][PYTHON] Fix flaky test DaemonTests.do_termination_test which fail on Python 3.7
## What changes were proposed in this pull request?

This PR picks up https://github.com/apache/spark/pull/25315 back after removing `Popen.wait` usage which exists in Python 3 only. I saw the last test results wrongly and thought it was passed.

Fix flaky test DaemonTests.do_termination_test which fail on Python 3.7. I add a sleep after the test connection to daemon.

## How was this patch tested?

Run test
```
python/run-tests --python-executables=python3.7 --testname "pyspark.tests.test_daemon DaemonTests"
```
**Before**
Fail on test "test_termination_sigterm". And we can see daemon process do not exit.
**After**
Test passed

Closes #25343 from HyukjinKwon/SPARK-28582.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-03 10:31:15 +09:00
Dongjoon Hyun 0c6874fb37 [SPARK-28606][INFRA] Update CRAN key to recover docker image generation
## What changes were proposed in this pull request?

CRAN repo changed the key and it causes our release script failure. This is a release blocker for Apache Spark 2.4.4 and 3.0.0.
- https://cran.r-project.org/bin/linux/ubuntu/README.html
```
Err:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease
  The following signatures couldn't be verified because the public key is not available: NO_PUBKEY 51716619E084DAB9
...
W: GPG error: https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease: The following signatures couldn't be verified because the public key is not available: NO_PUBKEY 51716619E084DAB9
E: The repository 'https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease' is not signed.
```

Note that they are reusing `cran35` for R 3.6 although they changed the key.
```
Even though R has moved to version 3.6, for compatibility the sources.list entry still uses the cran3.5 designation.
```

This PR aims to recover the docker image generation first. We will verify the R doc generation in a separate JIRA and PR.

## How was this patch tested?

Manual. After `docker-build.log`, it should continue to the next stage, `Building v3.0.0-rc1`.
```
$ dev/create-release/do-release-docker.sh -d /tmp/spark-3.0.0 -n -s docs
...
Log file: docker-build.log
Building v3.0.0-rc1; output will be at /tmp/spark-3.0.0/output
```

Closes #25339 from dongjoon-hyun/SPARK-28606.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-08-02 23:41:00 +00:00
yunzoud c212c9d9ed
[SPARK-28574][CORE] Allow to config different sizes for event queues
## What changes were proposed in this pull request?
Add configuration spark.scheduler.listenerbus.eventqueue.${name}.capacity to allow configuration of different event queue size.

## How was this patch tested?
Unit test in core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Closes #25307 from yunzoud/SPARK-28574.

Authored-by: yunzoud <yun.zou@databricks.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2019-08-02 15:27:33 -07:00
Xiao Li 10d4ffd577 [SPARK-28532][SPARK-28530][SQL][FOLLOWUP] Inline doc for FixedPoint(1) batches "Subquery" and "Join Reorder"
## What changes were proposed in this pull request?
Explained why "Subquery" and "Join Reorder" optimization batches should be `FixedPoint(1)`, which was introduced in SPARK-28532 and SPARK-28530.

## How was this patch tested?

Existing UTs.

Closes #25320 from yeshengm/SPARK-28530-followup.

Lead-authored-by: Xiao Li <gatorsmile@gmail.com>
Co-authored-by: Yesheng Ma <kimi.ysma@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-08-02 14:23:41 -07:00
Dongjoon Hyun 8ae032d78d Revert "[SPARK-28582][PYSPARK] Fix flaky test DaemonTests.do_termination_test which fail on Python 3.7"
This reverts commit fbeee0c5bc.
2019-08-02 10:14:20 -07:00
Jungtaek Lim (HeartSaVioR) 7ffc00ccc3 [MINOR][DOC][SS] Correct description of minPartitions in Kafka option
## What changes were proposed in this pull request?

`minPartitions` has been used as a hint and relevant method (KafkaOffsetRangeCalculator.getRanges) doesn't guarantee the behavior that partitions will be equal or more than given value.

d67b98ea01/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala (L32-L46)

This patch makes clear the configuration is a hint, and actual partitions could be less or more.

## How was this patch tested?

Just a documentation change.

Closes #25332 from HeartSaVioR/MINOR-correct-kafka-structured-streaming-doc-minpartition.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-02 09:12:54 -07:00
Sean Owen b148bd5ccb [SPARK-28519][SQL] Use StrictMath log, pow functions for platform independence
## What changes were proposed in this pull request?

See discussion on the JIRA (and dev). At heart, we find that math.log and math.pow can actually return slightly different results across platforms because of hardware optimizations. For the actual SQL log and pow functions, I propose that we should use StrictMath instead to ensure the answers are already the same. (This should have the benefit of helping tests pass on aarch64.)

Further, the atanh function (which is not part of java.lang.Math) can be implemented in a slightly different and more accurate way.

## How was this patch tested?

Existing tests (which will need to be changed).
Some manual testing locally to understand the numeric issues.

Closes #25279 from srowen/SPARK-28519.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-02 10:55:44 -05:00
actuaryzhang 6d7a6751d8 [SPARK-20604][ML] Allow imputer to handle numeric types
## What changes were proposed in this pull request?

Imputer currently requires input column to be Double or Float, but the logic should work on any numeric data types. Many practical problems have integer  data types, and it could get very tedious to manually cast them into Double before calling imputer. This transformer could be extended to handle all numeric types.

## How was this patch tested?
new test

Closes #17864 from actuaryzhang/imputer.

Lead-authored-by: actuaryzhang <actuaryzhang10@gmail.com>
Co-authored-by: Wayne Zhang <actuaryzhang@uber.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-02 10:54:50 -05:00
Huaxin Gao 660423d717 [SPARK-23469][ML] HashingTF should use corrected MurmurHash3 implementation
## What changes were proposed in this pull request?

Update HashingTF to use new implementation of MurmurHash3
Make HashingTF use the old MurmurHash3 when a model from pre 3.0 is loaded

## How was this patch tested?

Change existing unit tests. Also add one unit test to make sure HashingTF use the old MurmurHash3 when a model from pre 3.0 is loaded

Closes #25303 from huaxingao/spark-23469.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-02 10:53:36 -05:00
Yuming Wang efd92993f4 [SPARK-28510][SQL] Implement Spark's own GetFunctionsOperation
## What changes were proposed in this pull request?

This PR implements Spark's own GetFunctionsOperation which mitigates the differences between Spark SQL and Hive UDFs. But our implementation is different from Hive's implementation:
- Our implementation always returns results. Hive only returns results when [(null == catalogName || "".equals(catalogName)) && (null == schemaName || "".equals(schemaName))](https://github.com/apache/hive/blob/rel/release-3.1.1/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java#L101-L119).
- Our implementation pads the `REMARKS` field with the function usage - Hive returns an empty string.
- Our implementation does not support `FUNCTION_TYPE`, but Hive does.

## How was this patch tested?

unit tests

Closes #25252 from wangyum/SPARK-28510.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-08-02 08:50:42 -07:00
zhengruifeng 37243e160e [SPARK-28579][ML] MaxAbsScaler avoids conversion to breeze.vector
## What changes were proposed in this pull request?
avoid `.ml.vector => .breeze.vector` conversion in `MaxAbsScaler`,
and reuse the transformation method in `StandardScalerModel`, which can deal with dense & sparse vector separately.

## How was this patch tested?
existing suites

Closes #25311 from zhengruifeng/maxabs_opt.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-02 10:38:10 -05:00
WeichenXu fbeee0c5bc [SPARK-28582][PYSPARK] Fix flaky test DaemonTests.do_termination_test which fail on Python 3.7
## What changes were proposed in this pull request?

Fix flaky test DaemonTests.do_termination_test which fail on Python 3.7. I add a sleep after the test connection to daemon.

## How was this patch tested?

Run test
```
python/run-tests --python-executables=python3.7 --testname "pyspark.tests.test_daemon DaemonTests"
```
**Before**
Fail on test "test_termination_sigterm". And we can see daemon process do not exit.
**After**
Test passed

Closes #25315 from WeichenXu123/fix_py37_daemon.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-02 22:07:06 +09:00
Liang-Chi Hsieh 77c7e91e02 [SPARK-28445][SQL][PYTHON] Fix error when PythonUDF is used in both group by and aggregate expression
## What changes were proposed in this pull request?

When PythonUDF is used in group by, and it is also in aggregate expression, like

```
SELECT pyUDF(a + 1), COUNT(b) FROM testData GROUP BY pyUDF(a + 1)
```

It causes analysis exception in `CheckAnalysis`, like
```
org.apache.spark.sql.AnalysisException: expression 'testdata.`a`' is neither present in the group by, nor is it an aggregate function.
```

First, `CheckAnalysis` can't check semantic equality between PythonUDFs.
Second, even we make it possible, runtime exception will be thrown

```
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF1#8615
...
Cause: java.lang.RuntimeException: Couldn't find pythonUDF1#8615 in [cast(pythonUDF0#8614 as int)#8617,count(b#8599)#8607L]
```

The cause is, `ExtractPythonUDFs` extracts both PythonUDFs in group by and aggregate expression. The PythonUDFs are two different aliases now in the logical aggregate. In runtime, we can't bind the resulting expression in aggregate to its grouping and aggregate attributes.

This patch proposes a rule `ExtractGroupingPythonUDFFromAggregate` to extract PythonUDFs in group by and evaluate them before aggregate. We replace the group by PythonUDF in aggregate expression with aliased result.

The query plan of query `SELECT pyUDF(a + 1), pyUDF(COUNT(b)) FROM testData GROUP BY pyUDF(a + 1)`, like

```
== Optimized Logical Plan ==
Project [CAST(pyUDF(cast((a + 1) as string)) AS INT)#8608, cast(pythonUDF0#8616 as bigint) AS CAST(pyUDF(cast(count(b) as string)) AS BIGINT)#8610L]
+- BatchEvalPython [pyUDF(cast(agg#8613L as string))], [pythonUDF0#8616]
   +- Aggregate [cast(groupingPythonUDF#8614 as int)], [cast(groupingPythonUDF#8614 as int) AS CAST(pyUDF(cast((a + 1) as string)) AS INT)#8608, count(b#8599) AS agg#8613L]
      +- Project [pythonUDF0#8615 AS groupingPythonUDF#8614, b#8599]
         +- BatchEvalPython [pyUDF(cast((a#8598 + 1) as string))], [pythonUDF0#8615]
            +- LocalRelation [a#8598, b#8599]

== Physical Plan ==
*(3) Project [CAST(pyUDF(cast((a + 1) as string)) AS INT)#8608, cast(pythonUDF0#8616 as bigint) AS CAST(pyUDF(cast(count(b) as string)) AS BIGINT)#8610L]
+- BatchEvalPython [pyUDF(cast(agg#8613L as string))], [pythonUDF0#8616]
   +- *(2) HashAggregate(keys=[cast(groupingPythonUDF#8614 as int)#8617], functions=[count(b#8599)], output=[CAST(pyUDF(cast((a + 1) as string)) AS INT)#8608, agg#8613L])
      +- Exchange hashpartitioning(cast(groupingPythonUDF#8614 as int)#8617, 5), true
         +- *(1) HashAggregate(keys=[cast(groupingPythonUDF#8614 as int) AS cast(groupingPythonUDF#8614 as int)#8617], functions=[partial_count(b#8599)], output=[cast(groupingPythonUDF#8614 as int)#8617, count#8619L])
            +- *(1) Project [pythonUDF0#8615 AS groupingPythonUDF#8614, b#8599]
               +- BatchEvalPython [pyUDF(cast((a#8598 + 1) as string))], [pythonUDF0#8615]
                  +- LocalTableScan [a#8598, b#8599]
```

## How was this patch tested?

Added tests.

Closes #25215 from viirya/SPARK-28445.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-02 19:47:29 +09:00
Nick Karpov 6d32deeecc [SPARK-28475][CORE] Add regex MetricFilter to GraphiteSink
## What changes were proposed in this pull request?

Today all registered metric sources are reported to GraphiteSink with no filtering mechanism, although the codahale project does support it.

GraphiteReporter (ScheduledReporter) from the codahale project requires you implement and supply the MetricFilter interface (there is only a single implementation by default in the codahale project, MetricFilter.ALL).

Propose to add an additional regex config to match and filter metrics to the GraphiteSink

## How was this patch tested?

Included a GraphiteSinkSuite that tests:

1. Absence of regex filter (existing default behavior maintained)
2. Presence of `regex=<regexexpr>` correctly filters metric keys

Closes #25232 from nkarpov/graphite_regex.

Authored-by: Nick Karpov <nick@nickkarpov.com>
Signed-off-by: jerryshao <jerryshao@tencent.com>
2019-08-02 17:50:15 +08:00
Yuming Wang 4e7a4cd20e [SPARK-28521][SQL] Fix error message for built-in functions
## What changes were proposed in this pull request?

```sql
spark-sql> select cast(1);
19/07/26 00:54:17 ERROR SparkSQLDriver: Failed in [select cast(1)]
java.lang.UnsupportedOperationException: empty.init
	at scala.collection.TraversableLike$class.init(TraversableLike.scala:451)
	at scala.collection.mutable.ArrayOps$ofInt.scala$collection$IndexedSeqOptimized$$super$init(ArrayOps.scala:234)
	at scala.collection.IndexedSeqOptimized$class.init(IndexedSeqOptimized.scala:135)
	at scala.collection.mutable.ArrayOps$ofInt.init(ArrayOps.scala:234)
	at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$7$$anonfun$11.apply(FunctionRegistry.scala:565)
	at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$7$$anonfun$11.apply(FunctionRegistry.scala:558)
	at scala.Option.getOrElse(Option.scala:121)
```

The reason is that we did not handle the case [`validParametersCount.length == 0`](2d74f14d74/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala (L588)) because the [parameter types](2d74f14d74/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala (L589)) can be `Expression`, `DataType` and `Option`. This PR makes it  handle the case `validParametersCount.length == 0`.

## How was this patch tested?

unit tests

Closes #25261 from wangyum/SPARK-28521.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-01 18:02:50 -05:00
Marcelo Vanzin 607fb87906 [SPARK-28584][CORE] Fix thread safety issue in blacklist timer, tests
There's a small, probably very hard to hit thread-safety issue in the blacklist
abort timers in the task scheduler, where they access a non-thread-safe map without
locks.

In the tests, the code was also calling methods on the TaskSetManager without
holding the proper locks, which could cause threads to call non-thread-safe
TSM methods concurrently.

Closes #25317 from vanzin/SPARK-28584.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-01 10:37:47 -07:00
HyukjinKwon 8e1602a04f [SPARK-28568][SHUFFLE][DOCS] Make Javadoc in org.apache.spark.shuffle.api visible
## What changes were proposed in this pull request?

This PR proposes to make Javadoc in org.apache.spark.shuffle.api visible.

## How was this patch tested?

Manually built the doc and checked:

![Screen Shot 2019-08-01 at 4 48 23 PM](https://user-images.githubusercontent.com/6477701/62275587-400cc080-b47d-11e9-8fba-c4a0607093d1.png)

Closes #25323 from HyukjinKwon/SPARK-28568.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-01 10:24:29 -07:00
HyukjinKwon 24c1bc2483 [SPARK-28586][INFRA] Make merge-spark-pr script compatible with Python 3
## What changes were proposed in this pull request?

This PR proposes to make `merge_spark_pr.py` script Python 3 compatible.

## How was this patch tested?

Manually tested against my forked remote with the PR and JIRA below:

https://github.com/apache/spark/pull/25321
https://github.com/apache/spark/pull/25286
https://issues.apache.org/jira/browse/SPARK-28153

Closes #25322 from HyukjinKwon/merge-script.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-01 10:17:17 -07:00
zhengruifeng b29829e2ab [SPARK-25584][ML][DOC] datasource for libsvm user guide
## What changes were proposed in this pull request?
it seems that doc for libsvm datasource is not added in https://github.com/apache/spark/pull/22675.
This pr is to add it.

## How was this patch tested?
doc built locally
![图片](https://user-images.githubusercontent.com/7322292/62044350-4ad51480-b235-11e9-8f09-cbcbe9d3b7f9.png)

Closes #25286 from zhengruifeng/doc_libsvm_data_source.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-01 09:15:42 -05:00
Wing Yew Poon 80ab19b9fd [SPARK-26329][CORE] Faster polling of executor memory metrics.
## What changes were proposed in this pull request?

Prior to this change, in an executor, on each heartbeat, memory metrics are polled and sent in the heartbeat. The heartbeat interval is 10s by default. With this change, in an executor, memory metrics can optionally be polled in a separate poller at a shorter interval.

For each executor, we use a map of (stageId, stageAttemptId) to (count of running tasks, executor metric peaks) to track what stages are active as well as the per-stage memory metric peaks. When polling the executor memory metrics, we attribute the memory to the active stage(s), and update the peaks. In a heartbeat, we send the per-stage peaks (for stages active at that time), and then reset the peaks. The semantics would be that the per-stage peaks sent in each heartbeat are the peaks since the last heartbeat.

We also keep a map of taskId to memory metric peaks. This tracks the metric peaks during the lifetime of the task. The polling thread updates this as well. At end of a task, we send the peak metric values in the task result. In case of task failure, we send the peak metric values in the `TaskFailedReason`.

We continue to do the stage-level aggregation in the EventLoggingListener.

For the driver, we still only poll on heartbeats. What the driver sends will be the current values of the metrics in the driver at the time of the heartbeat. This is semantically the same as before.

## How was this patch tested?

Unit tests. Manually tested applications on an actual system and checked the event logs; the metrics appear in the SparkListenerTaskEnd and SparkListenerStageExecutorMetrics events.

Closes #23767 from wypoon/wypoon_SPARK-26329.

Authored-by: Wing Yew Poon <wypoon@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
2019-08-01 09:09:46 -05:00
WeichenXu 26d03b62e2 [SPARK-28366][CORE] Logging in driver when loading single large unsplittable file
## What changes were proposed in this pull request?

Logging in driver when loading single large unsplittable file via `sc.textFile` or csv/json datasouce.
Current condition triggering logging is
* only generate one partition
* file is unsplittable, possible reason is:
   - compressed by unsplittable compression algo such as gzip.
   - multiLine mode in csv/json datasource
   - wholeText mode in text datasource
* file size exceed the config threshold `spark.io.warning.largeFileThreshold` (default value is 1GB)

## How was this patch tested?

Manually test.
Generate one gzip file exceeding 1GB,
```
base64 -b 50 /dev/urandom | head -c 2000000000 > file1.txt
cat file1.txt | gzip > file1.gz
```
then launch spark-shell,

run
```
sc.textFile("file:///path/to/file1.gz").count()
```
Will print log like:
```
WARN HadoopRDD: Loading one large unsplittable file file:/.../f1.gz with only one partition, because the file is compressed by unsplittable compression codec
```

run
```
sc.textFile("file:///path/to/file1.txt").count()
```
Will print log like:
```
WARN HadoopRDD: Loading one large file file:/.../f1.gz with only one partition, we can increase partition numbers by the `minPartitions` argument in method `sc.textFile
```

run
```
spark.read.csv("file:///path/to/file1.gz").count
```
Will print log like:
```
WARN CSVScan: Loading one large unsplittable file file:/.../f1.gz with only one partition, the reason is: the file is compressed by unsplittable compression codec
```

run
```
spark.read.option("multiLine", true).csv("file:///path/to/file1.gz").count
```
Will print log like:
```
WARN CSVScan: Loading one large unsplittable file file:/.../f1.gz with only one partition, the reason is: the csv datasource is set multiLine mode
```

JSON and Text datasource also tested with similar cases.

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #25134 from WeichenXu123/log_gz.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-01 20:29:18 +08:00