Commit graph

5810 commits

Author SHA1 Message Date
gengjiaan 99de6a4240 [SPARK-27924][SQL][TEST][FOLLOW-UP] Enable Boolean-Predicate syntax tests
## What changes were proposed in this pull request?

This PR is a follow-up to https://github.com/apache/spark/pull/25074

## How was this patch tested?

Pass the Jenkins with the newly update test files.

Closes #25366 from beliefer/uncomment-boolean-test.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-07 00:34:49 -07:00
mcheah 44e607e921 [SPARK-28238][SQL] Implement DESCRIBE TABLE for Data Source V2 Tables
## What changes were proposed in this pull request?

Implements the `DESCRIBE TABLE` logical and physical plans for data source v2 tables.

## How was this patch tested?

Added unit tests to `DataSourceV2SQLSuite`.

Closes #25040 from mccheah/describe-table-v2.

Authored-by: mcheah <mcheah@palantir.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-07 14:26:45 +08:00
WeichenXu a133175ffa [SPARK-28615][SQL][DOCS] Add a guide line for dataframe functions to say column signature function is by default
## What changes were proposed in this pull request?

Add a guide line for dataframe functions, say:
```
This function APIs usually have methods with Column signature only because it can support not only Column but also other types such as a native string. The other variants currently exist for historical reasons.
```

## How was this patch tested?

N/A

Closes #25355 from WeichenXu123/update_functions_guide2.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-07 10:39:47 +09:00
Nik Vanderhoof 9e931e787d [SPARK-27905][SQL] Add higher order function 'forall'
## What changes were proposed in this pull request?

Add's the higher order function `forall`, which tests an array to see if a predicate holds for every element.
The function is implemented in `org.apache.spark.sql.catalyst.expressions.ArrayForAll`.
The function is added to the function registry under the pretty name `forall`.

## How was this patch tested?

I've added appropriate unit tests for the new ArrayForAll expression in
`sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala`.

Also added tests for the function in `sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala`.

Not sure who is best to ask about this PR so:
 HyukjinKwon rxin gatorsmile ueshin srowen hvanhovell gatorsmile

Closes #24761 from nvander1/feature/for_all.

Lead-authored-by: Nik Vanderhoof <nikolasrvanderhoof@gmail.com>
Co-authored-by: Nik <nikolasrvanderhoof@gmail.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
2019-08-06 14:25:53 -07:00
Maxim Gekk 9e3aab8b95 [SPARK-28623][SQL] Support dow, isodow and doy by extract()
## What changes were proposed in this pull request?

In the PR, I propose to use existing expressions `DayOfYear`, `WeekDay` and `DayOfWeek`, and support additional parameters of `extract()` for feature parity with PostgreSQL (https://www.postgresql.org/docs/11/functions-datetime.html#FUNCTIONS-DATETIME-EXTRACT):

1. `dow` - the day of the week as Sunday (0) to Saturday (6)
2. `isodow` - the day of the week as Monday (1) to Sunday (7)
3. `doy` - the day of the year (1 - 365/366)

Here are examples:
```sql
spark-sql> SELECT EXTRACT(DOW FROM TIMESTAMP '2001-02-16 20:38:40');
5
spark-sql> SELECT EXTRACT(ISODOW FROM TIMESTAMP '2001-02-18 20:38:40');
7
spark-sql> SELECT EXTRACT(DOY FROM TIMESTAMP '2001-02-16 20:38:40');
47
```

## How was this patch tested?

Updated `extract.sql`.

Closes #25367 from MaxGekk/extract-ext.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-06 13:39:49 -07:00
HyukjinKwon bab88c48b1 [SPARK-28622][SQL][PYTHON] Rename PullOutPythonUDFInJoinCondition to ExtractPythonUDFFromJoinCondition and move to 'Extract Python UDFs'
## What changes were proposed in this pull request?

This PR targets to rename `PullOutPythonUDFInJoinCondition` to `ExtractPythonUDFFromJoinCondition` and move to 'Extract Python UDFs' together with other Python UDF related rules.

Currently `PullOutPythonUDFInJoinCondition` rule is alone outside of other 'Extract Python UDFs' rules together.

and the name `ExtractPythonUDFFromJoinCondition` is matched to existing Python UDF extraction rules.

## How was this patch tested?

Existing tests should cover.

Closes #25358 from HyukjinKwon/move-python-join-rule.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-08-05 23:36:35 -07:00
Udbhav30 150dbc5dc2 [SPARK-28391][PYTHON][SQL][TESTS][FOLLOW-UP] Add UDF cases into groupby clause in 'pgSQL/select_implicit.sql'
## What changes were proposed in this pull request?
This PR adds UDF cases into group by clause in 'pgSQL/select_implicit.sql'

<details><summary>Diff comparing to 'pgSQL/select_implicit.sql'</summary>
<p>

```diff
diff --git a/home/root1/src/spark/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-select_implicit.sql.out b/home/root1/src/spark/sql/core/src/test/resources/sql-tests/results/pgSQL/select_implicit.sql.out
index 17303b2..0675820 100755
--- a/home/root1/src/spark/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-select_implicit.sql.out
+++ b/home/root1/src/spark/sql/core/src/test/resources/sql-tests/results/pgSQL/select_implicit.sql.out
 -91,11 +91,9  struct<>

 -- !query 11
-SELECT udf(c), udf(count(*)) FROM test_missing_target GROUP BY
-udf(test_missing_target.c)
-ORDER BY udf(c)
+SELECT c, count(*) FROM test_missing_target GROUP BY test_missing_target.c ORDER BY c
 -- !query 11 schema
-struct<CAST(udf(cast(c as string)) AS STRING):string,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<c:string,count(1):bigint>
 -- !query 11 output
 ABAB	2
 BBBB	2
 -106,10 +104,9  cccc	2

 -- !query 12
-SELECT udf(count(*)) FROM test_missing_target GROUP BY udf(test_missing_target.c)
-ORDER BY udf(c)
+SELECT count(*) FROM test_missing_target GROUP BY test_missing_target.c ORDER BY c
 -- !query 12 schema
-struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<count(1):bigint>
 -- !query 12 output
 2
 2
 -120,18 +117,18  struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>

 -- !query 13
-SELECT udf(count(*)) FROM test_missing_target GROUP BY udf(a) ORDER BY udf(b)
+SELECT count(*) FROM test_missing_target GROUP BY a ORDER BY b
 -- !query 13 schema
 struct<>
 -- !query 13 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`b`' given input columns: [CAST(udf(cast(count(1) as string)) AS BIGINT)]; line 1 pos 75
+cannot resolve '`b`' given input columns: [count(1)]; line 1 pos 61

 -- !query 14
-SELECT udf(count(*)) FROM test_missing_target GROUP BY udf(b) ORDER BY udf(b)
+SELECT count(*) FROM test_missing_target GROUP BY b ORDER BY b
 -- !query 14 schema
-struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<count(1):bigint>
 -- !query 14 output
 1
 2
 -140,10 +137,10  struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>

 -- !query 15
-SELECT udf(test_missing_target.b), udf(count(*))
-  FROM test_missing_target GROUP BY udf(b) ORDER BY udf(b)
+SELECT test_missing_target.b, count(*)
+  FROM test_missing_target GROUP BY b ORDER BY b
 -- !query 15 schema
-struct<CAST(udf(cast(b as string)) AS INT):int,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<b:int,count(1):bigint>
 -- !query 15 output
 1	1
 2	2
 -152,9 +149,9  struct<CAST(udf(cast(b as string)) AS INT):int,CAST(udf(cast(count(1) as string)

 -- !query 16
-SELECT udf(c) FROM test_missing_target ORDER BY udf(a)
+SELECT c FROM test_missing_target ORDER BY a
 -- !query 16 schema
-struct<CAST(udf(cast(c as string)) AS STRING):string>
+struct<c:string>
 -- !query 16 output
 XXXX
 ABAB
 -169,10 +166,9  CCCC

 -- !query 17
-SELECT udf(count(*)) FROM test_missing_target GROUP BY udf(b) ORDER BY udf(b)
-desc
+SELECT count(*) FROM test_missing_target GROUP BY b ORDER BY b desc
 -- !query 17 schema
-struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<count(1):bigint>
 -- !query 17 output
 4
 3
 -181,17 +177,17  struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>

 -- !query 18
-SELECT udf(count(*)) FROM test_missing_target ORDER BY udf(1) desc
+SELECT count(*) FROM test_missing_target ORDER BY 1 desc
 -- !query 18 schema
-struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<count(1):bigint>
 -- !query 18 output
 10

 -- !query 19
-SELECT udf(c), udf(count(*)) FROM test_missing_target GROUP BY 1 ORDER BY 1
+SELECT c, count(*) FROM test_missing_target GROUP BY 1 ORDER BY 1
 -- !query 19 schema
-struct<CAST(udf(cast(c as string)) AS STRING):string,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<c:string,count(1):bigint>
 -- !query 19 output
 ABAB	2
 BBBB	2
 -202,30 +198,30  cccc	2

 -- !query 20
-SELECT udf(c), udf(count(*)) FROM test_missing_target GROUP BY 3
+SELECT c, count(*) FROM test_missing_target GROUP BY 3
 -- !query 20 schema
 struct<>
 -- !query 20 output
 org.apache.spark.sql.AnalysisException
-GROUP BY position 3 is not in select list (valid range is [1, 2]); line 1 pos 63
+GROUP BY position 3 is not in select list (valid range is [1, 2]); line 1 pos 53

 -- !query 21
-SELECT udf(count(*)) FROM test_missing_target x, test_missing_target y
-	WHERE udf(x.a) = udf(y.a)
-	GROUP BY udf(b) ORDER BY udf(b)
+SELECT count(*) FROM test_missing_target x, test_missing_target y
+	WHERE x.a = y.a
+	GROUP BY b ORDER BY b
 -- !query 21 schema
 struct<>
 -- !query 21 output
 org.apache.spark.sql.AnalysisException
-Reference 'b' is ambiguous, could be: x.b, y.b.; line 3 pos 14
+Reference 'b' is ambiguous, could be: x.b, y.b.; line 3 pos 10

 -- !query 22
-SELECT udf(a), udf(a) FROM test_missing_target
-	ORDER BY udf(a)
+SELECT a, a FROM test_missing_target
+	ORDER BY a
 -- !query 22 schema
-struct<CAST(udf(cast(a as string)) AS INT):int,CAST(udf(cast(a as string)) AS INT):int>
+struct<a:int,a:int>
 -- !query 22 output
 0	0
 1	1
 -240,10 +236,10  struct<CAST(udf(cast(a as string)) AS INT):int,CAST(udf(cast(a as string)) AS IN

 -- !query 23
-SELECT udf(udf(a)/2), udf(udf(a)/2) FROM test_missing_target
-	ORDER BY udf(udf(a)/2)
+SELECT a/2, a/2 FROM test_missing_target
+	ORDER BY a/2
 -- !query 23 schema
-struct<CAST(udf(cast((cast(udf(cast(a as string)) as int) div 2) as string)) AS INT):int,CAST(udf(cast((cast(udf(cast(a as string)) as int) div 2) as string)) AS INT):int>
+struct<(a div 2):int,(a div 2):int>
 -- !query 23 output
 0	0
 0	0
 -258,10 +254,10  struct<CAST(udf(cast((cast(udf(cast(a as string)) as int) div 2) as string)) AS

 -- !query 24
-SELECT udf(a/2), udf(a/2) FROM test_missing_target
-	GROUP BY udf(a/2) ORDER BY udf(a/2)
+SELECT a/2, a/2 FROM test_missing_target
+	GROUP BY a/2 ORDER BY a/2
 -- !query 24 schema
-struct<CAST(udf(cast((a div 2) as string)) AS INT):int,CAST(udf(cast((a div 2) as string)) AS INT):int>
+struct<(a div 2):int,(a div 2):int>
 -- !query 24 output
 0	0
 1	1
 -271,11 +267,11  struct<CAST(udf(cast((a div 2) as string)) AS INT):int,CAST(udf(cast((a div 2) a

 -- !query 25
-SELECT udf(x.b), udf(count(*)) FROM test_missing_target x, test_missing_target y
-	WHERE udf(x.a) = udf(y.a)
-	GROUP BY udf(x.b) ORDER BY udf(x.b)
+SELECT x.b, count(*) FROM test_missing_target x, test_missing_target y
+	WHERE x.a = y.a
+	GROUP BY x.b ORDER BY x.b
 -- !query 25 schema
-struct<CAST(udf(cast(b as string)) AS INT):int,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<b:int,count(1):bigint>
 -- !query 25 output
 1	1
 2	2
 -284,11 +280,11  struct<CAST(udf(cast(b as string)) AS INT):int,CAST(udf(cast(count(1) as string)

 -- !query 26
-SELECT udf(count(*)) FROM test_missing_target x, test_missing_target y
-	WHERE udf(x.a) = udf(y.a)
-	GROUP BY udf(x.b) ORDER BY udf(x.b)
+SELECT count(*) FROM test_missing_target x, test_missing_target y
+	WHERE x.a = y.a
+	GROUP BY x.b ORDER BY x.b
 -- !query 26 schema
-struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
+struct<count(1):bigint>
 -- !query 26 output
 1
 2
 -297,22 +293,22  struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>

 -- !query 27
-SELECT udf(a%2), udf(count(udf(b))) FROM test_missing_target
-GROUP BY udf(test_missing_target.a%2)
-ORDER BY udf(test_missing_target.a%2)
+SELECT a%2, count(b) FROM test_missing_target
+GROUP BY test_missing_target.a%2
+ORDER BY test_missing_target.a%2
 -- !query 27 schema
-struct<CAST(udf(cast((a % 2) as string)) AS INT):int,CAST(udf(cast(count(cast(udf(cast(b as string)) as int)) as string)) AS BIGINT):bigint>
+struct<(a % 2):int,count(b):bigint>
 -- !query 27 output
 0	5
 1	5

 -- !query 28
-SELECT udf(count(c)) FROM test_missing_target
-GROUP BY udf(lower(test_missing_target.c))
-ORDER BY udf(lower(test_missing_target.c))
+SELECT count(c) FROM test_missing_target
+GROUP BY lower(test_missing_target.c)
+ORDER BY lower(test_missing_target.c)
 -- !query 28 schema
-struct<CAST(udf(cast(count(c) as string)) AS BIGINT):bigint>
+struct<count(c):bigint>
 -- !query 28 output
 2
 3
 -321,18 +317,18  struct<CAST(udf(cast(count(c) as string)) AS BIGINT):bigint>

 -- !query 29
-SELECT udf(count(udf(a))) FROM test_missing_target GROUP BY udf(a) ORDER BY udf(b)
+SELECT count(a) FROM test_missing_target GROUP BY a ORDER BY b
 -- !query 29 schema
 struct<>
 -- !query 29 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`b`' given input columns: [CAST(udf(cast(count(cast(udf(cast(a as string)) as int)) as string)) AS BIGINT)]; line 1 pos 80
+cannot resolve '`b`' given input columns: [count(a)]; line 1 pos 61

 -- !query 30
-SELECT udf(count(b)) FROM test_missing_target GROUP BY udf(b/2) ORDER BY udf(b/2)
+SELECT count(b) FROM test_missing_target GROUP BY b/2 ORDER BY b/2
 -- !query 30 schema
-struct<CAST(udf(cast(count(b) as string)) AS BIGINT):bigint>
+struct<count(b):bigint>
 -- !query 30 output
 1
 5
 -340,10 +336,10  struct<CAST(udf(cast(count(b) as string)) AS BIGINT):bigint>

 -- !query 31
-SELECT udf(lower(test_missing_target.c)), udf(count(udf(c)))
-  FROM test_missing_target GROUP BY udf(lower(c)) ORDER BY udf(lower(c))
+SELECT lower(test_missing_target.c), count(c)
+  FROM test_missing_target GROUP BY lower(c) ORDER BY lower(c)
 -- !query 31 schema
-struct<CAST(udf(cast(lower(c) as string)) AS STRING):string,CAST(udf(cast(count(cast(udf(cast(c as string)) as string)) as string)) AS BIGINT):bigint>
+struct<lower(c):string,count(c):bigint>
 -- !query 31 output
 abab	2
 bbbb	3
 -352,9 +348,9  xxxx	1

 -- !query 32
-SELECT udf(a) FROM test_missing_target ORDER BY udf(upper(udf(d)))
+SELECT a FROM test_missing_target ORDER BY upper(d)
 -- !query 32 schema
-struct<CAST(udf(cast(a as string)) AS INT):int>
+struct<a:int>
 -- !query 32 output
 0
 1
 -369,33 +365,32  struct<CAST(udf(cast(a as string)) AS INT):int>

 -- !query 33
-SELECT udf(count(b)) FROM test_missing_target
-	GROUP BY udf((b + 1) / 2) ORDER BY udf((b + 1) / 2) desc
+SELECT count(b) FROM test_missing_target
+	GROUP BY (b + 1) / 2 ORDER BY (b + 1) / 2 desc
 -- !query 33 schema
-struct<CAST(udf(cast(count(b) as string)) AS BIGINT):bigint>
+struct<count(b):bigint>
 -- !query 33 output
 7
 3

 -- !query 34
-SELECT udf(count(udf(x.a))) FROM test_missing_target x, test_missing_target y
-	WHERE udf(x.a) = udf(y.a)
-	GROUP BY udf(b/2) ORDER BY udf(b/2)
+SELECT count(x.a) FROM test_missing_target x, test_missing_target y
+	WHERE x.a = y.a
+	GROUP BY b/2 ORDER BY b/2
 -- !query 34 schema
 struct<>
 -- !query 34 output
 org.apache.spark.sql.AnalysisException
-Reference 'b' is ambiguous, could be: x.b, y.b.; line 3 pos 14
+Reference 'b' is ambiguous, could be: x.b, y.b.; line 3 pos 10

 -- !query 35
-SELECT udf(x.b/2), udf(count(udf(x.b))) FROM test_missing_target x,
-test_missing_target y
-	WHERE udf(x.a) = udf(y.a)
-	GROUP BY udf(x.b/2) ORDER BY udf(x.b/2)
+SELECT x.b/2, count(x.b) FROM test_missing_target x, test_missing_target y
+	WHERE x.a = y.a
+	GROUP BY x.b/2 ORDER BY x.b/2
 -- !query 35 schema
-struct<CAST(udf(cast((b div 2) as string)) AS INT):int,CAST(udf(cast(count(cast(udf(cast(b as string)) as int)) as string)) AS BIGINT):bigint>
+struct<(b div 2):int,count(b):bigint>
 -- !query 35 output
 0	1
 1	5
 -403,14 +398,14  struct<CAST(udf(cast((b div 2) as string)) AS INT):int,CAST(udf(cast(count(cast(

 -- !query 36
-SELECT udf(count(udf(b))) FROM test_missing_target x, test_missing_target y
-	WHERE udf(x.a) = udf(y.a)
-	GROUP BY udf(x.b/2)
+SELECT count(b) FROM test_missing_target x, test_missing_target y
+	WHERE x.a = y.a
+	GROUP BY x.b/2
 -- !query 36 schema
 struct<>
 -- !query 36 output
 org.apache.spark.sql.AnalysisException
-Reference 'b' is ambiguous, could be: x.b, y.b.; line 1 pos 21
+Reference 'b' is ambiguous, could be: x.b, y.b.; line 1 pos 13

 -- !query 37
```

</p>
</details>

## How was this patch tested?
Tested as Guided in SPARK-27921

Closes #25350 from Udbhav30/master.

Authored-by: Udbhav30 <u.agrawal30@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-06 15:14:32 +09:00
HyukjinKwon da3d4b6a35 [SPARK-28537][SQL][HOTFIX][FOLLOW-UP] Add supportColumnar in DebugExec
## What changes were proposed in this pull request?

This PR add supportColumnar in DebugExec. Seems there was a conflict between https://github.com/apache/spark/pull/25274 and https://github.com/apache/spark/pull/25264

Currently tests are broken in Jenkins:

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/108687/
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/108688/
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/108693/

```
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree: ColumnarToRow +- InMemoryTableScan [id#356956L]       +- InMemoryRelation [id#356956L], StorageLevel(disk, memory, deserialized, 1 replicas)             +- *(1) Range (0, 5, step=1, splits=2)
Stacktrace
sbt.ForkMain$ForkError: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
ColumnarToRow
+- InMemoryTableScan [id#356956L]
      +- InMemoryRelation [id#356956L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Range (0, 5, step=1, splits=2)

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:431)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:323)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)
```

## How was this patch tested?

Manually tested the failed test.

Closes #25365 from HyukjinKwon/SPARK-28537.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-06 15:08:15 +09:00
Stavros Kontopoulos 4a2c662315 [SPARK-27921][PYTHON][SQL][TESTS][FOLLOW-UP] Add UDF cases into group by clause in 'udf-group-analytics.sql'
## What changes were proposed in this pull request?

This PR is a followup of a fix as described in here: #25215 (comment)
<details><summary>Diff comparing to 'group-analytics.sql'</summary>
<p>

```diff
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out
index 3439a05727..de297ab166 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out
 -13,9 +13,9  struct<>

 -- !query 1
-SELECT a + b, b, SUM(a - b) FROM testData GROUP BY a + b, b WITH CUBE
+SELECT udf(a + b), b, udf(SUM(a - b)) FROM testData GROUP BY udf(a + b), b WITH CUBE
 -- !query 1 schema
-struct<(a + b):int,b:int,sum((a - b)):bigint>
+struct<CAST(udf(cast((a + b) as string)) AS INT):int,b:int,CAST(udf(cast(sum(cast((a - b) as bigint)) as string)) AS BIGINT):bigint>
 -- !query 1 output
 2	1	0
 2	NULL	0
 -33,9 +33,9  NULL	NULL	3

 -- !query 2
-SELECT a, b, SUM(b) FROM testData GROUP BY a, b WITH CUBE
+SELECT udf(a), udf(b), SUM(b) FROM testData GROUP BY udf(a), b WITH CUBE
 -- !query 2 schema
-struct<a:int,b:int,sum(b):bigint>
+struct<CAST(udf(cast(a as string)) AS INT):int,CAST(udf(cast(b as string)) AS INT):int,sum(b):bigint>
 -- !query 2 output
 1	1	1
 1	2	2
 -52,9 +52,9  NULL	NULL	9

 -- !query 3
-SELECT a + b, b, SUM(a - b) FROM testData GROUP BY a + b, b WITH ROLLUP
+SELECT udf(a + b), b, SUM(a - b) FROM testData GROUP BY a + b, b WITH ROLLUP
 -- !query 3 schema
-struct<(a + b):int,b:int,sum((a - b)):bigint>
+struct<CAST(udf(cast((a + b) as string)) AS INT):int,b:int,sum((a - b)):bigint>
 -- !query 3 output
 2	1	0
 2	NULL	0
 -70,9 +70,9  NULL	NULL	3

 -- !query 4
-SELECT a, b, SUM(b) FROM testData GROUP BY a, b WITH ROLLUP
+SELECT udf(a), b, udf(SUM(b)) FROM testData GROUP BY udf(a), b WITH ROLLUP
 -- !query 4 schema
-struct<a:int,b:int,sum(b):bigint>
+struct<CAST(udf(cast(a as string)) AS INT):int,b:int,CAST(udf(cast(sum(cast(b as bigint)) as string)) AS BIGINT):bigint>
 -- !query 4 output
 1	1	1
 1	2	2
 -97,7 +97,7  struct<>

 -- !query 6
-SELECT course, year, SUM(earnings) FROM courseSales GROUP BY ROLLUP(course, year) ORDER BY course, year
+SELECT course, year, SUM(earnings) FROM courseSales GROUP BY ROLLUP(course, year) ORDER BY udf(course), year
 -- !query 6 schema
 struct<course:string,year:int,sum(earnings):bigint>
 -- !query 6 output
 -111,7 +111,7  dotNET	2013	48000

 -- !query 7
-SELECT course, year, SUM(earnings) FROM courseSales GROUP BY CUBE(course, year) ORDER BY course, year
+SELECT course, year, SUM(earnings) FROM courseSales GROUP BY CUBE(course, year) ORDER BY course, udf(year)
 -- !query 7 schema
 struct<course:string,year:int,sum(earnings):bigint>
 -- !query 7 output
 -127,9 +127,9  dotNET	2013	48000

 -- !query 8
-SELECT course, year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course, year)
+SELECT course, udf(year), SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course, year)
 -- !query 8 schema
-struct<course:string,year:int,sum(earnings):bigint>
+struct<course:string,CAST(udf(cast(year as string)) AS INT):int,sum(earnings):bigint>
 -- !query 8 output
 Java	NULL	50000
 NULL	2012	35000
 -138,26 +138,26  dotNET	NULL	63000

 -- !query 9
-SELECT course, year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course)
+SELECT course, year, udf(SUM(earnings)) FROM courseSales GROUP BY course, year GROUPING SETS(course)
 -- !query 9 schema
-struct<course:string,year:int,sum(earnings):bigint>
+struct<course:string,year:int,CAST(udf(cast(sum(cast(earnings as bigint)) as string)) AS BIGINT):bigint>
 -- !query 9 output
 Java	NULL	50000
 dotNET	NULL	63000

 -- !query 10
-SELECT course, year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(year)
+SELECT udf(course), year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(year)
 -- !query 10 schema
-struct<course:string,year:int,sum(earnings):bigint>
+struct<CAST(udf(cast(course as string)) AS STRING):string,year:int,sum(earnings):bigint>
 -- !query 10 output
 NULL	2012	35000
 NULL	2013	78000

 -- !query 11
-SELECT course, SUM(earnings) AS sum FROM courseSales
-GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, sum
+SELECT course, udf(SUM(earnings)) AS sum FROM courseSales
+GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, udf(sum)
 -- !query 11 schema
 struct<course:string,sum:bigint>
 -- !query 11 output
 -173,7 +173,7  dotNET	63000

 -- !query 12
 SELECT course, SUM(earnings) AS sum, GROUPING_ID(course, earnings) FROM courseSales
-GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, sum
+GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY udf(course), sum
 -- !query 12 schema
 struct<course:string,sum:bigint,grouping_id(course, earnings):int>
 -- !query 12 output
 -188,10 +188,10  dotNET	63000	1

 -- !query 13
-SELECT course, year, GROUPING(course), GROUPING(year), GROUPING_ID(course, year) FROM courseSales
+SELECT udf(course), udf(year), GROUPING(course), GROUPING(year), GROUPING_ID(course, year) FROM courseSales
 GROUP BY CUBE(course, year)
 -- !query 13 schema
-struct<course:string,year:int,grouping(course):tinyint,grouping(year):tinyint,grouping_id(course, year):int>
+struct<CAST(udf(cast(course as string)) AS STRING):string,CAST(udf(cast(year as string)) AS INT):int,grouping(course):tinyint,grouping(year):tinyint,grouping_id(course, year):int>
 -- !query 13 output
 Java	2012	0	0	0
 Java	2013	0	0	0
 -205,7 +205,7  dotNET	NULL	0	1	1

 -- !query 14
-SELECT course, year, GROUPING(course) FROM courseSales GROUP BY course, year
+SELECT course, udf(year), GROUPING(course) FROM courseSales GROUP BY course, udf(year)
 -- !query 14 schema
 struct<>
 -- !query 14 output
 -214,7 +214,7  grouping() can only be used with GroupingSets/Cube/Rollup;

 -- !query 15
-SELECT course, year, GROUPING_ID(course, year) FROM courseSales GROUP BY course, year
+SELECT course, udf(year), GROUPING_ID(course, year) FROM courseSales GROUP BY udf(course), year
 -- !query 15 schema
 struct<>
 -- !query 15 output
 -223,7 +223,7  grouping_id() can only be used with GroupingSets/Cube/Rollup;

 -- !query 16
-SELECT course, year, grouping__id FROM courseSales GROUP BY CUBE(course, year) ORDER BY grouping__id, course, year
+SELECT course, year, grouping__id FROM courseSales GROUP BY CUBE(course, year) ORDER BY grouping__id, course, udf(year)
 -- !query 16 schema
 struct<course:string,year:int,grouping__id:int>
 -- !query 16 output
 -240,7 +240,7  NULL	NULL	3

 -- !query 17
 SELECT course, year FROM courseSales GROUP BY CUBE(course, year)
-HAVING GROUPING(year) = 1 AND GROUPING_ID(course, year) > 0 ORDER BY course, year
+HAVING GROUPING(year) = 1 AND GROUPING_ID(course, year) > 0 ORDER BY course, udf(year)
 -- !query 17 schema
 struct<course:string,year:int>
 -- !query 17 output
 -250,7 +250,7  dotNET	NULL

 -- !query 18
-SELECT course, year FROM courseSales GROUP BY course, year HAVING GROUPING(course) > 0
+SELECT course, udf(year) FROM courseSales GROUP BY udf(course), year HAVING GROUPING(course) > 0
 -- !query 18 schema
 struct<>
 -- !query 18 output
 -259,7 +259,7  grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup;

 -- !query 19
-SELECT course, year FROM courseSales GROUP BY course, year HAVING GROUPING_ID(course) > 0
+SELECT course, udf(udf(year)) FROM courseSales GROUP BY course, year HAVING GROUPING_ID(course) > 0
 -- !query 19 schema
 struct<>
 -- !query 19 output
 -268,9 +268,9  grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup;

 -- !query 20
-SELECT course, year FROM courseSales GROUP BY CUBE(course, year) HAVING grouping__id > 0
+SELECT udf(course), year FROM courseSales GROUP BY CUBE(course, year) HAVING grouping__id > 0
 -- !query 20 schema
-struct<course:string,year:int>
+struct<CAST(udf(cast(course as string)) AS STRING):string,year:int>
 -- !query 20 output
 Java	NULL
 NULL	2012
 -281,7 +281,7  dotNET	NULL

 -- !query 21
 SELECT course, year, GROUPING(course), GROUPING(year) FROM courseSales GROUP BY CUBE(course, year)
-ORDER BY GROUPING(course), GROUPING(year), course, year
+ORDER BY GROUPING(course), GROUPING(year), course, udf(year)
 -- !query 21 schema
 struct<course:string,year:int,grouping(course):tinyint,grouping(year):tinyint>
 -- !query 21 output
 -298,7 +298,7  NULL	NULL	1	1

 -- !query 22
 SELECT course, year, GROUPING_ID(course, year) FROM courseSales GROUP BY CUBE(course, year)
-ORDER BY GROUPING(course), GROUPING(year), course, year
+ORDER BY GROUPING(course), GROUPING(year), course, udf(year)
 -- !query 22 schema
 struct<course:string,year:int,grouping_id(course, year):int>
 -- !query 22 output
 -314,7 +314,7  NULL	NULL	3

 -- !query 23
-SELECT course, year FROM courseSales GROUP BY course, year ORDER BY GROUPING(course)
+SELECT course, udf(year) FROM courseSales GROUP BY course, udf(year) ORDER BY GROUPING(course)
 -- !query 23 schema
 struct<>
 -- !query 23 output
 -323,7 +323,7  grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup;

 -- !query 24
-SELECT course, year FROM courseSales GROUP BY course, year ORDER BY GROUPING_ID(course)
+SELECT course, udf(year) FROM courseSales GROUP BY course, udf(year) ORDER BY GROUPING_ID(course)
 -- !query 24 schema
 struct<>
 -- !query 24 output
 -332,7 +332,7  grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup;

 -- !query 25
-SELECT course, year FROM courseSales GROUP BY CUBE(course, year) ORDER BY grouping__id, course, year
+SELECT course, year FROM courseSales GROUP BY CUBE(course, year) ORDER BY grouping__id, udf(course), year
 -- !query 25 schema
 struct<course:string,year:int>
 -- !query 25 output
 -348,7 +348,7  NULL	NULL

 -- !query 26
-SELECT a + b AS k1, b AS k2, SUM(a - b) FROM testData GROUP BY CUBE(k1, k2)
+SELECT udf(a + b) AS k1, udf(b) AS k2, SUM(a - b) FROM testData GROUP BY CUBE(k1, k2)
 -- !query 26 schema
 struct<k1:int,k2:int,sum((a - b)):bigint>
 -- !query 26 output
 -368,7 +368,7  NULL	NULL	3

 -- !query 27
-SELECT a + b AS k, b, SUM(a - b) FROM testData GROUP BY ROLLUP(k, b)
+SELECT udf(udf(a + b)) AS k, b, SUM(a - b) FROM testData GROUP BY ROLLUP(k, b)
 -- !query 27 schema
 struct<k:int,b:int,sum((a - b)):bigint>
 -- !query 27 output
 -386,9 +386,9  NULL	NULL	3

 -- !query 28
-SELECT a + b, b AS k, SUM(a - b) FROM testData GROUP BY a + b, k GROUPING SETS(k)
+SELECT udf(a + b), udf(udf(b)) AS k, SUM(a - b) FROM testData GROUP BY a + b, k GROUPING SETS(k)
 -- !query 28 schema
-struct<(a + b):int,k:int,sum((a - b)):bigint>
+struct<CAST(udf(cast((a + b) as string)) AS INT):int,k:int,sum((a - b)):bigint>
 -- !query 28 output
 NULL	1	3
 NULL	2	0

```

</p>
</details>

## How was this patch tested?
Tested as instructed in SPARK-27921.

Closes #25362 from skonto/group-analytics-followup.

Authored-by: Stavros Kontopoulos <st.kontopoulos@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-06 15:00:28 +09:00
Jungtaek Lim (HeartSaVioR) 128ea37bda [SPARK-28601][CORE][SQL] Use StandardCharsets.UTF_8 instead of "UTF-8" string representation, and get rid of UnsupportedEncodingException
## What changes were proposed in this pull request?

This patch tries to keep consistency whenever UTF-8 charset is needed, as using `StandardCharsets.UTF_8` instead of using "UTF-8". If the String type is needed, `StandardCharsets.UTF_8.name()` is used.

This change also brings the benefit of getting rid of `UnsupportedEncodingException`, as we're providing `Charset` instead of `String` whenever possible.

This also changes some private Catalyst helper methods to operate on encodings as `Charset` objects rather than strings.

## How was this patch tested?

Existing unit tests.

Closes #25335 from HeartSaVioR/SPARK-28601.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-05 20:45:54 -07:00
Wenchen Fan 03e3006312 [SPARK-28213][SQL][FOLLOWUP] code cleanup and bug fix for columnar execution framework
## What changes were proposed in this pull request?

I did a post-hoc review of https://github.com/apache/spark/pull/25008 , and would like to propose some cleanups/fixes/improvements:

1. Do not track the scanTime metrics in `ColumnarToRowExec`. This metrics is specific to file scan, and doesn't make sense for a general batch-to-row operator.
2. Because of 2, we need to track scanTime when building RDDs in the file scan node.
3. use `RDD#mapPartitionsInternal` instead of `flatMap` in several places, as `mapPartitionsInternal` is created for Spark SQL and we use it in almost all the SQL operators.
4. Add `limitNotReachedCond` in `ColumnarToRowExec`. This was in the `ColumnarBatchScan` before and is critical for performance.
5. Clear the relationship between codegen stage and columnar stage. The whole-stage-codegen framework is completely row-based, so these 2 kinds of stages can NEVER overlap. When they are adjacent, it's either a `RowToColumnarExec` above `WholeStageExec`, or a `ColumnarToRowExec` above the `InputAdapter`.
6. Reuse the `ColumnarBatch` in `RowToColumnarExec`. We don't need to create a new one every time, just need to reset it.
7. Do not skip testing full scan node in `LogicalPlanTagInSparkPlanSuite`
8. Add back the removed tests in `WholeStageCodegenSuite`.

## How was this patch tested?

existing tests

Closes #25264 from cloud-fan/minor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-06 10:11:18 +08:00
Wenchen Fan 6fb79af48c [SPARK-28344][SQL] detect ambiguous self-join and fail the query
## What changes were proposed in this pull request?

This is an alternative solution of https://github.com/apache/spark/pull/24442 . It fails the query if ambiguous self join is detected, instead of trying to disambiguate it. The problem is that, it's hard to come up with a reasonable rule to disambiguate, the rule proposed by #24442 is mostly a heuristic.

### background of the self-join problem:
This is a long-standing bug and I've seen many people complaining about it in JIRA/dev list.

A typical example:
```
val df1 = …
val df2 = df1.filter(...)
df1.join(df2, df1("a") > df2("a")) // returns empty result
```
The root cause is, `Dataset.apply` is so powerful that users think it returns a column reference which can point to the column of the Dataset at anywhere. This is not true in many cases. `Dataset.apply` returns an `AttributeReference` . Different Datasets may share the same `AttributeReference`. In the example above, `df2` adds a Filter operator above the logical plan of `df1`, and the Filter operator reserves the output `AttributeReference` of its child. This means, `df1("a")` is exactly the same as `df2("a")`, and `df1("a") > df2("a")` always evaluates to false.

### The rule to detect ambiguous column reference caused by self join:
We can reuse the infra in #24442 :
1. each Dataset has a globally unique id.
2. the `AttributeReference` returned by `Dataset.apply` carries the ID and column position(e.g. 3rd column of the Dataset) via metadata.
3. the logical plan of a `Dataset` carries the ID via `TreeNodeTag`

When self-join happens, the analyzer asks the right side plan of join to re-generate output attributes with new exprIds. Based on it, a simple rule to detect ambiguous self join is:
1. find all column references (i.e. `AttributeReference`s with Dataset ID and col position) in the root node of a query plan.
2. for each column reference, traverse the query plan tree, find a sub-plan that carries Dataset ID and the ID is the same as the one in the column reference.
3. get the corresponding output attribute of the sub-plan by the col position in the column reference.
4. if the corresponding output attribute has a different exprID than the column reference, then it means this sub-plan is on the right side of a self-join and has regenerated its output attributes. This is an ambiguous self join because the column reference points to a table being self-joined.

## How was this patch tested?

existing tests and new test cases

Closes #25107 from cloud-fan/new-self-join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-06 10:06:36 +08:00
Kousuke Saruta 794804ea5e [SPARK-28537][SQL] DebugExec cannot debug broadcast or columnar related queries
DebugExec does not implement doExecuteBroadcast and doExecuteColumnar so we can't debug broadcast or columnar related query.

One example for broadcast is here.
```
val df1 = Seq(1, 2, 3).toDF
val df2 = Seq(1, 2, 3).toDF
val joined = df1.join(df2, df1("value") === df2("value"))
joined.debug()

java.lang.UnsupportedOperationException: Debug does not implement doExecuteBroadcast
...
```

Another for columnar is here.
```
val df = Seq(1, 2, 3).toDF
df.persist
df.debug()

java.lang.IllegalStateException: Internal Error class org.apache.spark.sql.execution.debug.package$DebugExec has column support mismatch:
...
```

## How was this patch tested?

Additional test cases in DebuggingSuite.

Closes #25274 from sarutak/fix-debugexec.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2019-08-06 08:26:51 +09:00
John Zhuge cae500a255 [SPARK-28178][SQL][FOLLOWUP] DataSourceV2: DataFrameWriter.insertInfo
## What changes were proposed in this pull request?

- DataFrameWriter.insertInto should match column names by position.
- Clean up test cases.

## How was this patch tested?

New tests:
- insertInto: append by position
- insertInto: overwrite partitioned table in static mode by position
- insertInto: overwrite partitioned table in dynamic mode by position

Closes #25353 from jzhuge/SPARK-28178-bypos.

Authored-by: John Zhuge <jzhuge@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-05 16:05:23 +08:00
Yuming Wang 21a18c6490 [SPARK-28614][SQL][TEST] Do not remove leading write space in the golden result file
## What changes were proposed in this pull request?

It's hard to know if the query needs to be sorted like [`SQLQueryTestSuite.isSorted`](2ecc39c8d3/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala (L375-L380)) when building a test framework for Thriftserver. So we  can sort both  the `outputs` and  the `expectedOutputs. However, we removed leading write space in the golden result file. This can lead to inconsistent results.
This PR makes it does not remove leading write space in the golden result file. Trailing write space still needs to be removed.

## How was this patch tested?

N/A

Closes #25351 from wangyum/SPARK-28614.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-04 17:43:31 -07:00
Sean Owen b148bd5ccb [SPARK-28519][SQL] Use StrictMath log, pow functions for platform independence
## What changes were proposed in this pull request?

See discussion on the JIRA (and dev). At heart, we find that math.log and math.pow can actually return slightly different results across platforms because of hardware optimizations. For the actual SQL log and pow functions, I propose that we should use StrictMath instead to ensure the answers are already the same. (This should have the benefit of helping tests pass on aarch64.)

Further, the atanh function (which is not part of java.lang.Math) can be implemented in a slightly different and more accurate way.

## How was this patch tested?

Existing tests (which will need to be changed).
Some manual testing locally to understand the numeric issues.

Closes #25279 from srowen/SPARK-28519.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-02 10:55:44 -05:00
Liang-Chi Hsieh 77c7e91e02 [SPARK-28445][SQL][PYTHON] Fix error when PythonUDF is used in both group by and aggregate expression
## What changes were proposed in this pull request?

When PythonUDF is used in group by, and it is also in aggregate expression, like

```
SELECT pyUDF(a + 1), COUNT(b) FROM testData GROUP BY pyUDF(a + 1)
```

It causes analysis exception in `CheckAnalysis`, like
```
org.apache.spark.sql.AnalysisException: expression 'testdata.`a`' is neither present in the group by, nor is it an aggregate function.
```

First, `CheckAnalysis` can't check semantic equality between PythonUDFs.
Second, even we make it possible, runtime exception will be thrown

```
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF1#8615
...
Cause: java.lang.RuntimeException: Couldn't find pythonUDF1#8615 in [cast(pythonUDF0#8614 as int)#8617,count(b#8599)#8607L]
```

The cause is, `ExtractPythonUDFs` extracts both PythonUDFs in group by and aggregate expression. The PythonUDFs are two different aliases now in the logical aggregate. In runtime, we can't bind the resulting expression in aggregate to its grouping and aggregate attributes.

This patch proposes a rule `ExtractGroupingPythonUDFFromAggregate` to extract PythonUDFs in group by and evaluate them before aggregate. We replace the group by PythonUDF in aggregate expression with aliased result.

The query plan of query `SELECT pyUDF(a + 1), pyUDF(COUNT(b)) FROM testData GROUP BY pyUDF(a + 1)`, like

```
== Optimized Logical Plan ==
Project [CAST(pyUDF(cast((a + 1) as string)) AS INT)#8608, cast(pythonUDF0#8616 as bigint) AS CAST(pyUDF(cast(count(b) as string)) AS BIGINT)#8610L]
+- BatchEvalPython [pyUDF(cast(agg#8613L as string))], [pythonUDF0#8616]
   +- Aggregate [cast(groupingPythonUDF#8614 as int)], [cast(groupingPythonUDF#8614 as int) AS CAST(pyUDF(cast((a + 1) as string)) AS INT)#8608, count(b#8599) AS agg#8613L]
      +- Project [pythonUDF0#8615 AS groupingPythonUDF#8614, b#8599]
         +- BatchEvalPython [pyUDF(cast((a#8598 + 1) as string))], [pythonUDF0#8615]
            +- LocalRelation [a#8598, b#8599]

== Physical Plan ==
*(3) Project [CAST(pyUDF(cast((a + 1) as string)) AS INT)#8608, cast(pythonUDF0#8616 as bigint) AS CAST(pyUDF(cast(count(b) as string)) AS BIGINT)#8610L]
+- BatchEvalPython [pyUDF(cast(agg#8613L as string))], [pythonUDF0#8616]
   +- *(2) HashAggregate(keys=[cast(groupingPythonUDF#8614 as int)#8617], functions=[count(b#8599)], output=[CAST(pyUDF(cast((a + 1) as string)) AS INT)#8608, agg#8613L])
      +- Exchange hashpartitioning(cast(groupingPythonUDF#8614 as int)#8617, 5), true
         +- *(1) HashAggregate(keys=[cast(groupingPythonUDF#8614 as int) AS cast(groupingPythonUDF#8614 as int)#8617], functions=[partial_count(b#8599)], output=[cast(groupingPythonUDF#8614 as int)#8617, count#8619L])
            +- *(1) Project [pythonUDF0#8615 AS groupingPythonUDF#8614, b#8599]
               +- BatchEvalPython [pyUDF(cast((a#8598 + 1) as string))], [pythonUDF0#8615]
                  +- LocalTableScan [a#8598, b#8599]
```

## How was this patch tested?

Added tests.

Closes #25215 from viirya/SPARK-28445.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-02 19:47:29 +09:00
Yuming Wang 4e7a4cd20e [SPARK-28521][SQL] Fix error message for built-in functions
## What changes were proposed in this pull request?

```sql
spark-sql> select cast(1);
19/07/26 00:54:17 ERROR SparkSQLDriver: Failed in [select cast(1)]
java.lang.UnsupportedOperationException: empty.init
	at scala.collection.TraversableLike$class.init(TraversableLike.scala:451)
	at scala.collection.mutable.ArrayOps$ofInt.scala$collection$IndexedSeqOptimized$$super$init(ArrayOps.scala:234)
	at scala.collection.IndexedSeqOptimized$class.init(IndexedSeqOptimized.scala:135)
	at scala.collection.mutable.ArrayOps$ofInt.init(ArrayOps.scala:234)
	at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$7$$anonfun$11.apply(FunctionRegistry.scala:565)
	at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$7$$anonfun$11.apply(FunctionRegistry.scala:558)
	at scala.Option.getOrElse(Option.scala:121)
```

The reason is that we did not handle the case [`validParametersCount.length == 0`](2d74f14d74/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala (L588)) because the [parameter types](2d74f14d74/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala (L589)) can be `Expression`, `DataType` and `Option`. This PR makes it  handle the case `validParametersCount.length == 0`.

## How was this patch tested?

unit tests

Closes #25261 from wangyum/SPARK-28521.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-01 18:02:50 -05:00
Wing Yew Poon 80ab19b9fd [SPARK-26329][CORE] Faster polling of executor memory metrics.
## What changes were proposed in this pull request?

Prior to this change, in an executor, on each heartbeat, memory metrics are polled and sent in the heartbeat. The heartbeat interval is 10s by default. With this change, in an executor, memory metrics can optionally be polled in a separate poller at a shorter interval.

For each executor, we use a map of (stageId, stageAttemptId) to (count of running tasks, executor metric peaks) to track what stages are active as well as the per-stage memory metric peaks. When polling the executor memory metrics, we attribute the memory to the active stage(s), and update the peaks. In a heartbeat, we send the per-stage peaks (for stages active at that time), and then reset the peaks. The semantics would be that the per-stage peaks sent in each heartbeat are the peaks since the last heartbeat.

We also keep a map of taskId to memory metric peaks. This tracks the metric peaks during the lifetime of the task. The polling thread updates this as well. At end of a task, we send the peak metric values in the task result. In case of task failure, we send the peak metric values in the `TaskFailedReason`.

We continue to do the stage-level aggregation in the EventLoggingListener.

For the driver, we still only poll on heartbeats. What the driver sends will be the current values of the metrics in the driver at the time of the heartbeat. This is semantically the same as before.

## How was this patch tested?

Unit tests. Manually tested applications on an actual system and checked the event logs; the metrics appear in the SparkListenerTaskEnd and SparkListenerStageExecutorMetrics events.

Closes #23767 from wypoon/wypoon_SPARK-26329.

Authored-by: Wing Yew Poon <wypoon@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
2019-08-01 09:09:46 -05:00
WeichenXu 26d03b62e2 [SPARK-28366][CORE] Logging in driver when loading single large unsplittable file
## What changes were proposed in this pull request?

Logging in driver when loading single large unsplittable file via `sc.textFile` or csv/json datasouce.
Current condition triggering logging is
* only generate one partition
* file is unsplittable, possible reason is:
   - compressed by unsplittable compression algo such as gzip.
   - multiLine mode in csv/json datasource
   - wholeText mode in text datasource
* file size exceed the config threshold `spark.io.warning.largeFileThreshold` (default value is 1GB)

## How was this patch tested?

Manually test.
Generate one gzip file exceeding 1GB,
```
base64 -b 50 /dev/urandom | head -c 2000000000 > file1.txt
cat file1.txt | gzip > file1.gz
```
then launch spark-shell,

run
```
sc.textFile("file:///path/to/file1.gz").count()
```
Will print log like:
```
WARN HadoopRDD: Loading one large unsplittable file file:/.../f1.gz with only one partition, because the file is compressed by unsplittable compression codec
```

run
```
sc.textFile("file:///path/to/file1.txt").count()
```
Will print log like:
```
WARN HadoopRDD: Loading one large file file:/.../f1.gz with only one partition, we can increase partition numbers by the `minPartitions` argument in method `sc.textFile
```

run
```
spark.read.csv("file:///path/to/file1.gz").count
```
Will print log like:
```
WARN CSVScan: Loading one large unsplittable file file:/.../f1.gz with only one partition, the reason is: the file is compressed by unsplittable compression codec
```

run
```
spark.read.option("multiLine", true).csv("file:///path/to/file1.gz").count
```
Will print log like:
```
WARN CSVScan: Loading one large unsplittable file file:/.../f1.gz with only one partition, the reason is: the csv datasource is set multiLine mode
```

JSON and Text datasource also tested with similar cases.

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #25134 from WeichenXu123/log_gz.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-01 20:29:18 +08:00
Marco Gaido ee41001949 [SPARK-26218][SQL] Overflow on arithmetic operations returns incorrect result
## What changes were proposed in this pull request?

When an overflow occurs performing an arithmetic operation, we are returning an incorrect value. Instead, we should throw an exception, as stated in the SQL standard.

## How was this patch tested?

added UT + existing UTs (improved)

Closes #21599 from mgaido91/SPARK-24598.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-01 14:51:38 +08:00
Yuming Wang 3002a3bf3c [SPARK-28581][SQL] Replace _FUNC_ in UDF ExpressionInfo
## What changes were proposed in this pull request?

This PR moves `replaceFunctionName(usage: String, functionName: String)`
from `DescribeFunctionCommand` to `ExpressionInfo` in order to make `ExpressionInfo` returns actual name instead of placeholder. We can get `ExpressionInfo`s directly through `SessionCatalog.lookupFunctionInfo` API and get the real names.

## How was this patch tested?

unit tests

Closes #25314 from wangyum/SPARK-28581.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-31 13:08:49 -07:00
Yuming Wang 261e113449 [SPARK-28038][SQL][TEST] Port text.sql
## What changes were proposed in this pull request?

This PR is to port text.sql from PostgreSQL regression tests. https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/text.sql

The expected results can be found in the link: https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/expected/text.out

When porting the test cases, found a PostgreSQL specific features that do not exist in Spark SQL:
[SPARK-28037](https://issues.apache.org/jira/browse/SPARK-28037): Add built-in String Functions: quote_literal

Also, found three inconsistent behavior:
[SPARK-27930](https://issues.apache.org/jira/browse/SPARK-27930): Spark SQL's format_string can not fully support PostgreSQL's format
[SPARK-28036](https://issues.apache.org/jira/browse/SPARK-28036):  Built-in udf left/right has inconsistent behavior
[SPARK-28033](https://issues.apache.org/jira/browse/SPARK-28033): String concatenation should low priority than other operators

## How was this patch tested?

N/A

Closes #24862 from wangyum/SPARK-28038.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-31 11:36:26 +09:00
Yuming Wang 2656c9d304 [SPARK-28071][SQL][TEST] Port strings.sql
## What changes were proposed in this pull request?

This PR is to port strings.sql from PostgreSQL regression tests. https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/strings.sql

The expected results can be found in the link: https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/expected/strings.out

When porting the test cases, found nine PostgreSQL specific features that do not exist in Spark SQL:
[SPARK-28076](https://issues.apache.org/jira/browse/SPARK-28076): Support regular expression substring
[SPARK-28078](https://issues.apache.org/jira/browse/SPARK-28078):  Add support other 4 REGEXP functions
[SPARK-28412](https://issues.apache.org/jira/browse/SPARK-28412): OVERLAY function support byte array
[SPARK-28083](https://issues.apache.org/jira/browse/SPARK-28083):  ANSI SQL: LIKE predicate: ESCAPE clause
[SPARK-28087](https://issues.apache.org/jira/browse/SPARK-28087):  Add support split_part
[SPARK-28122](https://issues.apache.org/jira/browse/SPARK-28122): Missing `sha224`/`sha256 `/`sha384 `/`sha512 ` functions
[SPARK-28123](https://issues.apache.org/jira/browse/SPARK-28123): Add support string functions: btrim
[SPARK-28448](https://issues.apache.org/jira/browse/SPARK-28448): Implement ILIKE operator
[SPARK-28449](https://issues.apache.org/jira/browse/SPARK-28449): Missing escape_string_warning and standard_conforming_strings config

Also, found five inconsistent behavior:
[SPARK-27952](https://issues.apache.org/jira/browse/SPARK-27952): String Functions: regexp_replace is not compatible
[SPARK-28121](https://issues.apache.org/jira/browse/SPARK-28121): decode can not accept 'escape' as charset
[SPARK-27930](https://issues.apache.org/jira/browse/SPARK-27930): Replace `strpos` with `locate` or `position` in Spark SQL
[SPARK-27930](https://issues.apache.org/jira/browse/SPARK-27930): Replace `to_hex` with `hex ` or in Spark SQL
[SPARK-28451](https://issues.apache.org/jira/browse/SPARK-28451): `substr` returns different values

## How was this patch tested?

N/A

Closes #24923 from wangyum/SPARK-28071.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2019-07-30 18:54:14 +09:00
John Zhuge 749b1d3a45 [SPARK-28178][SQL] DataSourceV2: DataFrameWriter.insertInfo
## What changes were proposed in this pull request?

Support multiple catalogs in the following InsertInto use cases:

- DataFrameWriter.insertInto("catalog.db.tbl")

Support matrix:

SaveMode|Partitioned Table|Partition Overwrite Mode|Action
--------|-----------------|------------------------|------
Append|*|*|AppendData
Overwrite|no|*|OverwriteByExpression(true)
Overwrite|yes|STATIC|OverwriteByExpression(true)
Overwrite|yes|DYNAMIC|OverwritePartitionsDynamic

## How was this patch tested?

New tests.
All existing catalyst and sql/core tests.

Closes #24980 from jzhuge/SPARK-28178-pr.

Authored-by: John Zhuge <jzhuge@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-07-30 17:22:33 +08:00
Yuming Wang df84bfe6fb [SPARK-28406][SQL][TEST] Port union.sql
## What changes were proposed in this pull request?

This PR is to port union.sql from PostgreSQL regression tests. https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/union.sql

The expected results can be found in the link: https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/expected/union.out

When porting the test cases, found four PostgreSQL specific features that do not exist in Spark SQL:
[SPARK-28409](https://issues.apache.org/jira/browse/SPARK-28409): SELECT FROM syntax
[SPARK-28298](https://issues.apache.org/jira/browse/SPARK-28298): Fully support char and varchar types
[SPARK-28557](https://issues.apache.org/jira/browse/SPARK-28557): Support empty select list
[SPARK-27767](https://issues.apache.org/jira/browse/SPARK-27767): Built-in function: generate_series

## How was this patch tested?

N/A

Closes #25163 from wangyum/SPARK-28406.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-30 00:14:17 -07:00
Yuming Wang d530d86ab8 [SPARK-28326][SQL][TEST] Port join.sql
## What changes were proposed in this pull request?

This PR is to port join.sql from PostgreSQL regression tests. https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/join.sql

The expected results can be found in the link: https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/expected/join.out

When porting the test cases, found nine PostgreSQL specific features that do not exist in Spark SQL:
[SPARK-27877](https://issues.apache.org/jira/browse/SPARK-27877): ANSI SQL: LATERAL derived table(T491)
[SPARK-20856](https://issues.apache.org/jira/browse/SPARK-20856): support statement using nested joins
[SPARK-27987](https://issues.apache.org/jira/browse/SPARK-27987): Support POSIX Regular Expressions
[SPARK-28382](https://issues.apache.org/jira/browse/SPARK-28382): Array Functions: unnest
[SPARK-25411](https://issues.apache.org/jira/browse/SPARK-25411): Implement range partition in Spark
[SPARK-28377](https://issues.apache.org/jira/browse/SPARK-28377): Fully support correlation names in the FROM clause
[SPARK-28330](https://issues.apache.org/jira/browse/SPARK-28330): Enhance query limit
[SPARK-28379](https://issues.apache.org/jira/browse/SPARK-28379): Correlated scalar subqueries must be aggregated
[SPARK-16452](https://issues.apache.org/jira/browse/SPARK-16452): basic INFORMATION_SCHEMA support

## How was this patch tested?

N/A

Closes #25148 from wangyum/SPARK-28326.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-30 00:09:56 -07:00
Shixiong Zhu 196a4d7117 [SPARK-28556][SQL] QueryExecutionListener should also notify Error
## What changes were proposed in this pull request?

Right now `Error` is not sent to `QueryExecutionListener.onFailure`. If there is any `Error` (such as `AssertionError`) when running a query, `QueryExecutionListener.onFailure` cannot be triggered.

This PR changes `onFailure` to accept a `Throwable` instead.

## How was this patch tested?

Jenkins

Closes #25292 from zsxwing/fix-QueryExecutionListener.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-30 11:47:36 +09:00
Maxim Gekk caa23e3efd [SPARK-28459][SQL] Add make_timestamp function
## What changes were proposed in this pull request?

New function `make_timestamp()` takes 6 columns `year`, `month`, `day`, `hour`, `min`, `sec` + optionally `timezone`, and makes new column of the `TIMESTAMP` type. If values in the input columns are `null` or out of valid ranges, the function returns `null`. Valid ranges are:
- `year` - `[1, 9999]`
- `month` - `[1, 12]`
- `day` - `[1, 31]`
- `hour` - `[0, 23]`
- `min` - `[0, 59]`
- `sec` - `[0, 60]`. If the `sec` argument equals to 60, the seconds field is set to 0 and 1 minute is added to the final timestamp.
- `timezone` - an identifier of timezone. Actual database of timezones can be found there: https://www.iana.org/time-zones.

Also constructed timestamp must be valid otherwise `make_timestamp` returns `null`.

The function is implemented similarly to `make_timestamp` in PostgreSQL: https://www.postgresql.org/docs/11/functions-datetime.html to maintain feature parity with it.

Here is an example:
```sql
select make_timestamp(2014, 12, 28, 6, 30, 45.887);
  2014-12-28 06:30:45.887
select make_timestamp(2014, 12, 28, 6, 30, 45.887, 'CET');
  2014-12-28 10:30:45.887
select make_timestamp(2019, 6, 30, 23, 59, 60)
  2019-07-01 00:00:00
```

Returned value has Spark Catalyst type `TIMESTAMP` which is similar to Oracle's `TIMESTAMP WITH LOCAL TIME ZONE` (see https://docs.oracle.com/cd/B28359_01/server.111/b28298/ch4datetime.htm#i1006169) where data is stored in the session time zone, and the time zone offset is not stored as part of the column data. When users retrieve the data, Spark returns it in the session time zone specified by the SQL config `spark.sql.session.timeZone`.

## How was this patch tested?

Added new tests to `DateExpressionsSuite`, and uncommented a test for `make_timestamp` in `pgSQL/timestamp.sql`.

Closes #25220 from MaxGekk/make_timestamp.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-29 11:00:08 -07:00
Lee Dongjin d98aa2a184 [MINOR] Trivial cleanups
These are what I found during working on #22282.

- Remove unused value: `UnsafeArraySuite#defaultTz`
- Remove redundant new modifier to the case class, `KafkaSourceRDDPartition`
- Remove unused variables from `RDD.scala`
- Remove trailing space from `structured-streaming-kafka-integration.md`
- Remove redundant parameter from `ArrowConvertersSuite`: `nullable` is `true` by default.
- Remove leading empty line: `UnsafeRow`
- Remove trailing empty line: `KafkaTestUtils`
- Remove unthrown exception type: `UnsafeMapData`
- Replace unused declarations: `expressions`
- Remove duplicated default parameter: `AnalysisErrorSuite`
- `ObjectExpressionsSuite`: remove duplicated parameters, conversions and unused variable

Closes #25251 from dongjinleekr/cleanup/201907.

Authored-by: Lee Dongjin <dongjin@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-29 23:38:02 +09:00
Maxim Gekk a5a5da78cf [SPARK-28471][SQL] Replace yyyy by uuuu in date-timestamp patterns without era
## What changes were proposed in this pull request?

In the PR, I propose to use `uuuu` for years instead of `yyyy` in date/timestamp patterns without the era pattern `G` (https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). **Parsing/formatting of positive years (current era) will be the same.** The difference is in formatting negative years belong to previous era - BC (Before Christ).

I replaced the `yyyy` pattern by `uuuu` everywhere except:
1. Test, Suite & Benchmark. Existing tests must work as is.
2. `SimpleDateFormat` because it doesn't support the `uuuu` pattern.
3. Comments and examples (except comments related to already replaced patterns).

Before the changes, the year of common era `100` and the year of BC era `-99`, showed similarly as `100`.  After the changes negative years will be formatted with the `-` sign.

Before:
```Scala
scala> Seq(java.time.LocalDate.of(-99, 1, 1)).toDF().show
+----------+
|     value|
+----------+
|0100-01-01|
+----------+
```

After:
```Scala
scala> Seq(java.time.LocalDate.of(-99, 1, 1)).toDF().show
+-----------+
|      value|
+-----------+
|-0099-01-01|
+-----------+
```

## How was this patch tested?

By existing test suites, and added tests for negative years to `DateFormatterSuite` and `TimestampFormatterSuite`.

Closes #25230 from MaxGekk/year-pattern-uuuu.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-28 20:36:36 -07:00
Dongjoon Hyun a428f40669 [SPARK-28549][BUILD][CORE][SQL] Use text.StringEscapeUtils instead lang3.StringEscapeUtils
## What changes were proposed in this pull request?

`org.apache.commons.lang3.StringEscapeUtils` was deprecated over two years ago at [LANG-1316](https://issues.apache.org/jira/browse/LANG-1316). There is no bug fixes after that.
```java
/**
 * <p>Escapes and unescapes {code String}s for
 * Java, Java Script, HTML and XML.</p>
 *
 * <p>#ThreadSafe#</p>
 * since 2.0
 * deprecated as of 3.6, use commons-text
 * <a href="https://commons.apache.org/proper/commons-text/javadocs/api-release/org/apache/commons/text/StringEscapeUtils.html">
 * StringEscapeUtils</a> instead
 */
Deprecated
public class StringEscapeUtils {
```

This PR aims to use the latest one from `commons-text` module which has more bug fixes like
[TEXT-100](https://issues.apache.org/jira/browse/TEXT-100), [TEXT-118](https://issues.apache.org/jira/browse/TEXT-118) and [TEXT-120](https://issues.apache.org/jira/browse/TEXT-120) by the following replacement.
```scala
-import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.commons.text.StringEscapeUtils
```

This will add a new dependency to `hadoop-2.7` profile distribution. In `hadoop-3.2` profile, we already have it.
```
+commons-text-1.6.jar
```

## How was this patch tested?

Pass the Jenkins with the existing tests.
- [Hadoop 2.7](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/108281)
- [Hadoop 3.2](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/108282)

Closes #25281 from dongjoon-hyun/SPARK-28549.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-29 11:45:29 +09:00
Kousuke Saruta 6bc5c6a4e7 [SPARK-28520][SQL] WholeStageCodegen does not work property for LocalTableScanExec
Code is not generated for LocalTableScanExec although proper situations.

If a LocalTableScanExec plan has the direct parent plan which supports WholeStageCodegen,
the LocalTableScanExec plan also should be within a WholeStageCodegen domain.
But code is not generated for LocalTableScanExec and InputAdapter is inserted for now.

```
val df1 = spark.createDataset(1 to 10).toDF
val df2 = spark.createDataset(1 to 10).toDF
val df3 = df1.join(df2, df1("value") === df2("value"))
df3.explain(true)

...

== Physical Plan ==
*(1) BroadcastHashJoin [value#1], [value#6], Inner, BuildRight
:- LocalTableScan [value#1]                                             // LocalTableScanExec is not within a WholeStageCodegen domain
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- LocalTableScan [value#6]
```

```
scala> df3.queryExecution.executedPlan.children.head.children.head.getClass
res4: Class[_ <: org.apache.spark.sql.execution.SparkPlan] = class org.apache.spark.sql.execution.InputAdapter
```

For the current implementation of LocalTableScanExec, codegen is enabled in case `parent` is not null
but `parent` is set in `consume`, which is called after `insertInputAdapter` so it doesn't work as intended.

After applying this cnahge, we can get following plan, which means LocalTableScanExec is within a WholeStageCodegen domain.

```
== Physical Plan ==
*(1) BroadcastHashJoin [value#63], [value#68], Inner, BuildRight
:- *(1) LocalTableScan [value#63]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- LocalTableScan [value#68]

## How was this patch tested?

New test cases are added into WholeStageCodegenSuite.

Closes #25260 from sarutak/localtablescan-improvement.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2019-07-29 08:35:25 +09:00
Huaxin Gao 3c5278748d [SPARK-28277][SQL][PYTHON][TESTS][FOLLOW-UP] Re-enable commented out test
## What changes were proposed in this pull request?

Fix for ```SPARK-28441 (PythonUDF used in correlated scalar subquery causes UnsupportedOperationException)``` is in. Re-enable the commented out test for ```udf(max(udf(column))) ```

## How was this patch tested?

use existing test ```udf-except.sql```

Closes #25278 from huaxingao/spark-28277n.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-28 15:52:31 -07:00
shahid 485ae6d181 [SPARK-25474][SQL] Support spark.sql.statistics.fallBackToHdfs in data source tables
In case of CatalogFileIndex datasource table, sizeInBytes is always coming as default size in bytes, which is  8.0EB (Even when the user give fallBackToHdfsForStatsEnabled=true) . So, the datasource table which has CatalogFileIndex, always prefer SortMergeJoin, instead of BroadcastJoin, even though the size is below broadcast join threshold.
In this PR, In case of CatalogFileIndex table, if we enable "fallBackToHdfsForStatsEnabled=true", then the computeStatistics  get the sizeInBytes from the hdfs and we get the actual size of the table. Hence, during join operation, when the table size is below broadcast threshold, it will prefer broadCastHashJoin instead of SortMergeJoin.

Added UT

Closes #22502 from shahidki31/SPARK-25474.

Authored-by: shahid <shahidki31@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-28 15:35:37 -07:00
Dongjoon Hyun d943ee0a88 [SPARK-28545][SQL] Add the hash map size to the directional log of ObjectAggregationIterator
## What changes were proposed in this pull request?

`ObjectAggregationIterator` shows a directional info message to increase `spark.sql.objectHashAggregate.sortBased.fallbackThreshold` when the size of the in-memory hash map grows too large and it falls back to sort-based aggregation.
However, we don't know how much we need to increase. This PR adds the size of the current in-memory hash map size to the log message.

**BEFORE**
```
15:21:41.669 Executor task launch worker for task 0 INFO
ObjectAggregationIterator: Aggregation hash map reaches threshold capacity (2 entries), ...
```

**AFTER**
```
15:20:05.742 Executor task launch worker for task 0 INFO
ObjectAggregationIterator: Aggregation hash map size 2 reaches threshold capacity (2 entries), ...
```

## How was this patch tested?

Manual. For example, run `ObjectHashAggregateSuite.scala`'s `typed_count fallback to sort-based aggregation` and search the above message in `target/unit-tests.log`.

Closes #25276 from dongjoon-hyun/SPARK-28545.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-27 18:55:36 -07:00
Yuming Wang 9eb541be22 [SPARK-28424][SQL] Support typed interval expression
## What changes were proposed in this pull request?

This PR add support typed `interval` expression:
```sql
spark-sql> select interval 'interval 3 year 1 hour';
interval 3 years 1 hours
spark-sql>
```

Please note that this pr did not add a cast alias for `interval` type like [other types](2d74f14d74/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala (L529-L541)) because neither PostgreSQL nor Hive supports this syntax.

## How was this patch tested?

unit tests

Closes #25241 from wangyum/SPARK-28424.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-27 14:25:35 -07:00
HyukjinKwon 8ce1ae52db [SPARK-28536][SQL][PYTHON][TESTS] Reduce shuffle partitions in Python UDF tests in SQLQueryTestSuite
## What changes were proposed in this pull request?

In Python UDF tests, the number of shuffle partitions matters considerably in the testing time because it requires to fork and communicate between external processes.

**Before:**

![image](https://user-images.githubusercontent.com/6477701/61989374-465c0080-b069-11e9-9936-b386d0cccf7a.png)

**After: (with 4)**

![Screen Shot 2019-07-27 at 10 43 34 AM](https://user-images.githubusercontent.com/9700541/61997757-743a4880-b05b-11e9-9180-8d0976bda3bd.png)

## How was this patch tested?

Manually tested in my local.

**Before:**

```
[info] SQLQueryTestSuite:
[info] - udf/udf-window.sql - Scala UDF (58 seconds, 558 milliseconds)
[info] - udf/udf-window.sql - Regular Python UDF (58 seconds, 371 milliseconds)
[info] - udf/udf-window.sql - Scalar Pandas UDF (1 minute, 8 seconds)
```

**After:**

```
[info] SQLQueryTestSuite:
[info] - udf/udf-window.sql - Scala UDF (14 seconds, 690 milliseconds)
[info] - udf/udf-window.sql - Regular Python UDF (10 seconds, 467 milliseconds)
[info] - udf/udf-window.sql - Scalar Pandas UDF (10 seconds, 895 milliseconds)
```

Closes #25271 from HyukjinKwon/SPARK-28536.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-27 10:46:35 -07:00
HyukjinKwon 1856ee3b92 [SPARK-28441][SQL][TESTS][FOLLOW-UP] Skip Python tests if python executable and pyspark library are unavailable
##  What changes were proposed in this pull request?

We should add `assume(shouldTestPythonUDFs)`. Maybe it's not a biggie in general but it can matter in other venders' testing base. For instance, if somebody launches a test in a minimal docker image, it might make the tests failed suddenly.

This skipping stuff isn't completely new in our test base. See `TestUtils.testCommandAvailable` for instance.

## How was this patch tested?

Manually tested.

Closes #25272 from HyukjinKwon/SPARK-28441.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-27 15:56:12 +09:00
Liang-Chi Hsieh 558dd23601 [SPARK-28441][SQL][PYTHON] Fix error when non-foldable expression is used in correlated scalar subquery
## What changes were proposed in this pull request?

In SPARK-15370, We checked the expression at the root of the correlated subquery, in order to fix count bug. If a `PythonUDF` in in the checking path, evaluating it causes the failure as we can't statically evaluate `PythonUDF`. The Python UDF test added at SPARK-28277 shows this issue.

If we can statically evaluate the expression, we intercept NULL values coming from the outer join and replace them with the value that the subquery's expression like before, if it is not, we replace them with the `PythonUDF` expression, with statically evaluated parameters.

After this, the last query in `udf-except.sql` which throws `java.lang.UnsupportedOperationException` can be run:

```
SELECT t1.k
FROM   t1
WHERE  t1.v <= (SELECT   udf(max(udf(t2.v)))
                FROM     t2
                WHERE    udf(t2.k) = udf(t1.k))
MINUS
SELECT t1.k
FROM   t1
WHERE  udf(t1.v) >= (SELECT   min(udf(t2.v))
                FROM     t2
                WHERE    t2.k = t1.k)
-- !query 2 schema
struct<k:string>
-- !query 2 output
two
```

Note that this issue is also for other non-foldable expressions, like rand. As like PythonUDF, we can't call `eval` on this kind of expressions in optimization. The evaluation needs to defer to query runtime.

## How was this patch tested?

Added tests.

Closes #25204 from viirya/SPARK-28441.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-07-27 10:38:34 +08:00
Yuming Wang 836a8ff2b9 [SPARK-28518][SQL][TEST] Refer to ChecksumFileSystem#isChecksumFile to fix StatisticsCollectionTestBase#getDataSize
## What changes were proposed in this pull request?

This PR fix [StatisticsCollectionTestBase.getDataSize](8158d5e27f/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala (L298-L304)) refer to [ChecksumFileSystem.isChecksumFile](https://github.com/apache/hadoop/blob/release-2.7.4-RC0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java#L93-L97).

More details: https://github.com/apache/spark/pull/25014#discussion_r307050435

## How was this patch tested?

unit tests

Closes #25259 from wangyum/SPARK-28518.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-26 14:48:24 -07:00
Yiheng Wang 6361467bde [SPARK-28289][SQL][PYTHON][TESTS] Convert and port 'union.sql' into UDF test base
## What changes were proposed in this pull request?
This PR adds some tests converted from 'union.sql' to test UDFs

<details><summary>Diff comparing to 'union.sql'</summary>
<p>

```diff
diff --git a/sql/core/src/test/resources/sql-tests/results/union.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-union.sql.out
index b023df825d..84b5e10dbe 100644
--- a/sql/core/src/test/resources/sql-tests/results/union.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-union.sql.out
 -19,10 +19,10  struct<>

 -- !query 2
-SELECT *
-FROM   (SELECT * FROM t1
+SELECT udf(c1) as c1, udf(c2) as c2
+FROM   (SELECT udf(c1) as c1, udf(c2) as c2 FROM t1
         UNION ALL
-        SELECT * FROM t1)
+        SELECT udf(c1) as c1, udf(c2) as c2 FROM t1)
 -- !query 2 schema
 struct<c1:int,c2:string>
 -- !query 2 output
 -33,12 +33,12  struct<c1:int,c2:string>

 -- !query 3
-SELECT *
-FROM   (SELECT * FROM t1
+SELECT udf(c1) as c1, udf(c2) as c2
+FROM   (SELECT udf(c1) as c1, udf(c2) as c2 FROM t1
         UNION ALL
-        SELECT * FROM t2
+        SELECT udf(c1) as c1, udf(c2) as c2 FROM t2
         UNION ALL
-        SELECT * FROM t2)
+        SELECT udf(c1) as c1, udf(c2) as c2 FROM t2)
 -- !query 3 schema
 struct<c1:decimal(11,1),c2:string>
 -- !query 3 output
 -51,11 +51,11  struct<c1:decimal(11,1),c2:string>

 -- !query 4
-SELECT a
-FROM (SELECT 0 a, 0 b
+SELECT udf(udf(a)) as a
+FROM (SELECT udf(0) a, udf(0) b
       UNION ALL
-      SELECT SUM(1) a, CAST(0 AS BIGINT) b
-      UNION ALL SELECT 0 a, 0 b) T
+      SELECT udf(SUM(1)) a, udf(CAST(0 AS BIGINT)) b
+      UNION ALL SELECT udf(0) a, udf(0) b) T
 -- !query 4 schema
 struct<a:bigint>
 -- !query 4 output
 -89,13 +89,13  struct<>

 -- !query 8
-SELECT 1 AS x,
-       col
-FROM   (SELECT col AS col
-        FROM (SELECT p1.col AS col
+SELECT udf(1) AS x,
+       udf(col) as col
+FROM   (SELECT udf(col) AS col
+        FROM (SELECT udf(p1.col) AS col
               FROM   p1 CROSS JOIN p2
               UNION ALL
-              SELECT col
+              SELECT udf(col)
               FROM p3) T1) T2
 -- !query 8 schema
 struct<x:int,col:int>
 -105,9 +105,9  struct<x:int,col:int>

 -- !query 9
-SELECT map(1, 2), 'str'
+SELECT map(1, 2), udf('str') as str
 UNION ALL
-SELECT map(1, 2, 3, NULL), 1
+SELECT map(1, 2, 3, NULL), udf(1)
 -- !query 9 schema
 struct<map(1, 2):map<int,int>,str:string>
 -- !query 9 output
 -116,9 +116,9  struct<map(1, 2):map<int,int>,str:string>

 -- !query 10
-SELECT array(1, 2), 'str'
+SELECT array(1, 2), udf('str') as str
 UNION ALL
-SELECT array(1, 2, 3, NULL), 1
+SELECT array(1, 2, 3, NULL), udf(1)
 -- !query 10 schema
 struct<array(1, 2):array<int>,str:string>
 -- !query 10 output
```

</p>
</details>

## How was this patch tested?
Tested as guided in SPARK-27921.

Closes #25202 from yiheng/fix_28289.

Authored-by: Yiheng Wang <yihengw@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-26 12:05:45 +09:00
Dongjoon Hyun cefce21acc [MINOR][SQL] Fix log messages of DataWritingSparkTask
## What changes were proposed in this pull request?

This PR fixes the log messages like `attempt 0stage 9.0` by adding a comma followed by space. These are all instances in `DataWritingSparkTask` which was introduced at 6d16b9885d. This should be fixed in `branch-2.4`, too.
```
19/07/25 18:35:01 INFO DataWritingSparkTask: Commit authorized for partition 65 (task 153, attempt 0stage 9.0)
19/07/25 18:35:01 INFO DataWritingSparkTask: Committed partition 65 (task 153, attempt 0stage 9.0)
```

## How was this patch tested?

This only changes log messages. Pass the Jenkins with the existing tests.

Closes #25257 from dongjoon-hyun/DataWritingSparkTask.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-26 09:25:13 +09:00
Ryan Blue 443904a140 [SPARK-27845][SQL] DataSourceV2: InsertTable
## What changes were proposed in this pull request?

Support multiple catalogs in the following InsertTable use cases:

- INSERT INTO [TABLE] catalog.db.tbl
- INSERT OVERWRITE TABLE catalog.db.tbl

Support matrix:

Overwrite|Partitioned Table|Partition Clause |Partition Overwrite Mode|Action
---------|-----------------|-----------------|------------------------|-----
false|*|*|*|AppendData
true|no|(empty)|*|OverwriteByExpression(true)
true|yes|p1,p2 or p1 or p2 or (empty)|STATIC|OverwriteByExpression(true)
true|yes|p2,p2 or p1 or p2 or (empty)|DYNAMIC|OverwritePartitionsDynamic
true|yes|p1=23,p2=3|*|OverwriteByExpression(p1=23 and p2=3)
true|yes|p1=23,p2 or p1=23|STATIC|OverwriteByExpression(p1=23)
true|yes|p1=23,p2 or p1=23|DYNAMIC|OverwritePartitionsDynamic

Notes:
- Assume the partitioned table has 2 partitions: p1 and p2.
- `STATIC` is the default Partition Overwrite Mode for data source tables.
- DSv2 tables currently do not support `IfPartitionNotExists`.

## How was this patch tested?

New tests.
All existing catalyst and sql/core tests.

Closes #24832 from jzhuge/SPARK-27845-pr.

Lead-authored-by: Ryan Blue <blue@apache.org>
Co-authored-by: John Zhuge <jzhuge@apache.org>
Signed-off-by: Burak Yavuz <brkyvz@gmail.com>
2019-07-25 15:05:51 -07:00
younggyu chun 89fd2b5efc [SPARK-28288][SQL][PYTHON][TESTS] Convert and port 'window.sql' into UDF test base
## What changes were proposed in this pull request?
This PR adds some tests converted from window.sql to test UDFs. Please see the contribution guide of this umbrella ticket - [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921).

<details><summary>Diff comparing to 'xxx.sql'</summary>
<p>

```diff
diff --git a/sql/core/src/test/resources/sql-tests/results/window.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-window.sql.out
index 367dc4f513..9354d5e311 100644
--- a/sql/core/src/test/resources/sql-tests/results/window.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-window.sql.out
 -21,10 +21,10  struct<>

 -- !query 1
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val ROWS CURRENT ROW) FROM testData
-ORDER BY cate, val
+SELECT udf(val), cate, count(val) OVER(PARTITION BY cate ORDER BY udf(val) ROWS CURRENT ROW) FROM testData
+ORDER BY cate, udf(val)
 -- !query 1 schema
-struct<val:int,cate:string,count(val) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND CURRENT ROW):bigint>
+struct<CAST(udf(cast(val as string)) AS INT):int,cate:string,count(val) OVER (PARTITION BY cate ORDER BY CAST(udf(cast(val as string)) AS INT) ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND CURRENT ROW):bigint>
 -- !query 1 output
 NULL   NULL    0
 3      NULL    1
 -38,10 +38,10  NULL        a       0

 -- !query 2
-SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val
-ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) FROM testData ORDER BY cate, val
+SELECT udf(val), cate, sum(val) OVER(PARTITION BY cate ORDER BY udf(val)
+ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) FROM testData ORDER BY cate, udf(val)
 -- !query 2 schema
-struct<val:int,cate:string,sum(val) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING):bigint>
+struct<CAST(udf(cast(val as string)) AS INT):int,cate:string,sum(val) OVER (PARTITION BY cate ORDER BY CAST(udf(cast(val as string)) AS INT) ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING):bigint>
 -- !query 2 output
 NULL   NULL    3
 3      NULL    3
 -55,20 +55,20  NULL        a       1

 -- !query 3
-SELECT val_long, cate, sum(val_long) OVER(PARTITION BY cate ORDER BY val_long
-ROWS BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY cate, val_long
+SELECT val_long, udf(cate), sum(val_long) OVER(PARTITION BY cate ORDER BY udf(val_long)
+ROWS BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY udf(cate), val_long
 -- !query 3 schema
 struct<>
 -- !query 3 output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'ROWS BETWEEN CURRENT ROW AND 2147483648L FOLLOWING' due to data type mismatch: The data type of the upper bound 'bigint' does not match the expected data type 'int'.; line 1 pos 41
+cannot resolve 'ROWS BETWEEN CURRENT ROW AND 2147483648L FOLLOWING' due to data type mismatch: The data type of the upper bound 'bigint' does not match the expected data type 'int'.; line 1 pos 46

 -- !query 4
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val RANGE 1 PRECEDING) FROM testData
-ORDER BY cate, val
+SELECT udf(val), cate, count(val) OVER(PARTITION BY udf(cate) ORDER BY val RANGE 1 PRECEDING) FROM testData
+ORDER BY cate, udf(val)
 -- !query 4 schema
-struct<val:int,cate:string,count(val) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND CURRENT ROW):bigint>
+struct<CAST(udf(cast(val as string)) AS INT):int,cate:string,count(val) OVER (PARTITION BY CAST(udf(cast(cate as string)) AS STRING) ORDER BY val ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND CURRENT ROW):bigint>
 -- !query 4 output
 NULL   NULL    0
 3      NULL    1
 -82,10 +82,10  NULL        a       0

 -- !query 5
-SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val
-RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
+SELECT val, udf(cate), sum(val) OVER(PARTITION BY udf(cate) ORDER BY val
+RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY udf(cate), val
 -- !query 5 schema
-struct<val:int,cate:string,sum(val) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING):bigint>
+struct<val:int,CAST(udf(cast(cate as string)) AS STRING):string,sum(val) OVER (PARTITION BY CAST(udf(cast(cate as string)) AS STRING) ORDER BY val ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING):bigint>
 -- !query 5 output
 NULL   NULL    NULL
 3      NULL    3
 -99,10 +99,10  NULL        a       NULL

 -- !query 6
-SELECT val_long, cate, sum(val_long) OVER(PARTITION BY cate ORDER BY val_long
-RANGE BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY cate, val_long
+SELECT val_long, udf(cate), sum(val_long) OVER(PARTITION BY udf(cate) ORDER BY val_long
+RANGE BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY udf(cate), val_long
 -- !query 6 schema
-struct<val_long:bigint,cate:string,sum(val_long) OVER (PARTITION BY cate ORDER BY val_long ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 2147483648 FOLLOWING):bigint>
+struct<val_long:bigint,CAST(udf(cast(cate as string)) AS STRING):string,sum(val_long) OVER (PARTITION BY CAST(udf(cast(cate as string)) AS STRING) ORDER BY val_long ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 2147483648 FOLLOWING):bigint>
 -- !query 6 output
 NULL   NULL    NULL
 1      NULL    1
 -116,10 +116,10  NULL      b       NULL

 -- !query 7
-SELECT val_double, cate, sum(val_double) OVER(PARTITION BY cate ORDER BY val_double
-RANGE BETWEEN CURRENT ROW AND 2.5 FOLLOWING) FROM testData ORDER BY cate, val_double
+SELECT val_double, udf(cate), sum(val_double) OVER(PARTITION BY udf(cate) ORDER BY val_double
+RANGE BETWEEN CURRENT ROW AND 2.5 FOLLOWING) FROM testData ORDER BY udf(cate), val_double
 -- !query 7 schema
-struct<val_double:double,cate:string,sum(val_double) OVER (PARTITION BY cate ORDER BY val_double ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND CAST(2.5 AS DOUBLE) FOLLOWING):double>
+struct<val_double:double,CAST(udf(cast(cate as string)) AS STRING):string,sum(val_double) OVER (PARTITION BY CAST(udf(cast(cate as string)) AS STRING) ORDER BY val_double ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND CAST(2.5 AS DOUBLE) FOLLOWING):double>
 -- !query 7 output
 NULL   NULL    NULL
 1.0    NULL    1.0
 -133,10 +133,10  NULL      NULL    NULL

 -- !query 8
-SELECT val_date, cate, max(val_date) OVER(PARTITION BY cate ORDER BY val_date
-RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING) FROM testData ORDER BY cate, val_date
+SELECT val_date, udf(cate), max(val_date) OVER(PARTITION BY udf(cate) ORDER BY val_date
+RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING) FROM testData ORDER BY udf(cate), val_date
 -- !query 8 schema
-struct<val_date:date,cate:string,max(val_date) OVER (PARTITION BY cate ORDER BY val_date ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING):date>
+struct<val_date:date,CAST(udf(cast(cate as string)) AS STRING):string,max(val_date) OVER (PARTITION BY CAST(udf(cast(cate as string)) AS STRING) ORDER BY val_date ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING):date>
 -- !query 8 output
 NULL   NULL    NULL
 2017-08-01     NULL    2017-08-01
 -150,11 +150,11  NULL      NULL    NULL

 -- !query 9
-SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp
+SELECT val_timestamp, udf(cate), avg(val_timestamp) OVER(PARTITION BY udf(cate) ORDER BY val_timestamp
 RANGE BETWEEN CURRENT ROW AND interval 23 days 4 hours FOLLOWING) FROM testData
-ORDER BY cate, val_timestamp
+ORDER BY udf(cate), val_timestamp
 -- !query 9 schema
-struct<val_timestamp:timestamp,cate:string,avg(CAST(val_timestamp AS DOUBLE)) OVER (PARTITION BY cate ORDER BY val_timestamp ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND interval 3 weeks 2 days 4 hours FOLLOWING):double>
+struct<val_timestamp:timestamp,CAST(udf(cast(cate as string)) AS STRING):string,avg(CAST(val_timestamp AS DOUBLE)) OVER (PARTITION BY CAST(udf(cast(cate as string)) AS STRING) ORDER BY val_timestamp ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND interval 3 weeks 2 days 4 hours FOLLOWING):double>
 -- !query 9 output
 NULL   NULL    NULL
 2017-07-31 17:00:00    NULL    1.5015456E9
 -168,10 +168,10  NULL      NULL    NULL

 -- !query 10
-SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC
+SELECT val, udf(cate), sum(val) OVER(PARTITION BY cate ORDER BY val DESC
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
 -- !query 10 schema
-struct<val:int,cate:string,sum(val) OVER (PARTITION BY cate ORDER BY val DESC NULLS LAST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING):bigint>
+struct<val:int,CAST(udf(cast(cate as string)) AS STRING):string,sum(val) OVER (PARTITION BY cate ORDER BY val DESC NULLS LAST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING):bigint>
 -- !query 10 output
 NULL   NULL    NULL
 3      NULL    3
 -185,58 +185,58  NULL      a       NULL

 -- !query 11
-SELECT val, cate, count(val) OVER(PARTITION BY cate
-ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING) FROM testData ORDER BY cate, val
+SELECT udf(val), cate, count(val) OVER(PARTITION BY udf(cate)
+ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING) FROM testData ORDER BY cate, udf(val)
 -- !query 11 schema
 struct<>
 -- !query 11 output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING' due to data type mismatch: Window frame upper bound '1' does not follow the lower bound 'unboundedfollowing$()'.; line 1 pos 33
+cannot resolve 'ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING' due to data type mismatch: Window frame upper bound '1' does not follow the lower bound 'unboundedfollowing$()'.; line 1 pos 38

 -- !query 12
-SELECT val, cate, count(val) OVER(PARTITION BY cate
-RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
+SELECT udf(val), cate, count(val) OVER(PARTITION BY udf(cate)
+RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, udf(val)
 -- !query 12 schema
 struct<>
 -- !query 12 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '(PARTITION BY testdata.`cate` RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame cannot be used in an unordered window specification.; line 1 pos 33
+cannot resolve '(PARTITION BY CAST(udf(cast(cate as string)) AS STRING) RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame cannot be used in an unordered window specification.; line 1 pos 38

 -- !query 13
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val, cate
-RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
+SELECT udf(val), cate, count(val) OVER(PARTITION BY udf(cate) ORDER BY udf(val), cate
+RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, udf(val)
 -- !query 13 schema
 struct<>
 -- !query 13 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '(PARTITION BY testdata.`cate` ORDER BY testdata.`val` ASC NULLS FIRST, testdata.`cate` ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame with value boundaries cannot be used in a window specification with multiple order by expressions: val#x ASC NULLS FIRST,cate#x ASC NULLS FIRST; line 1 pos 33
+cannot resolve '(PARTITION BY CAST(udf(cast(cate as string)) AS STRING) ORDER BY CAST(udf(cast(val as string)) AS INT) ASC NULLS FIRST, testdata.`cate` ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame with value boundaries cannot be used in a window specification with multiple order by expressions: cast(udf(cast(val#x as string)) as int) ASC NULLS FIRST,cate#x ASC NULLS FIRST; line 1 pos 38

 -- !query 14
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_timestamp
-RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
+SELECT udf(val), cate, count(val) OVER(PARTITION BY udf(cate) ORDER BY current_timestamp
+RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, udf(val)
 -- !query 14 schema
 struct<>
 -- !query 14 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '(PARTITION BY testdata.`cate` ORDER BY current_timestamp() ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: The data type 'timestamp' used in the order specification does not match the data type 'int' which is used in the range frame.; line 1 pos 33
+cannot resolve '(PARTITION BY CAST(udf(cast(cate as string)) AS STRING) ORDER BY current_timestamp() ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: The data type 'timestamp' used in the order specification does not match the data type 'int' which is used in the range frame.; line 1 pos 38

 -- !query 15
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
-RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM testData ORDER BY cate, val
+SELECT udf(val), cate, count(val) OVER(PARTITION BY udf(cate) ORDER BY val
+RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM testData ORDER BY udf(cate), val
 -- !query 15 schema
 struct<>
 -- !query 15 output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING' due to data type mismatch: The lower bound of a window frame must be less than or equal to the upper bound; line 1 pos 33
+cannot resolve 'RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING' due to data type mismatch: The lower bound of a window frame must be less than or equal to the upper bound; line 1 pos 38

 -- !query 16
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
-RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cate, val
+SELECT udf(val), cate, count(val) OVER(PARTITION BY udf(cate) ORDER BY udf(val)
+RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cate, val(val)
 -- !query 16 schema
 struct<>
 -- !query 16 output
 -245,48 +245,48  org.apache.spark.sql.catalyst.parser.ParseException
 Frame bound value must be a literal.(line 2, pos 30)

 == SQL ==
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
-RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cate, val
+SELECT udf(val), cate, count(val) OVER(PARTITION BY udf(cate) ORDER BY udf(val)
+RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cate, val(val)
 ------------------------------^^^

 -- !query 17
-SELECT val, cate,
-max(val) OVER w AS max,
-min(val) OVER w AS min,
-min(val) OVER w AS min,
-count(val) OVER w AS count,
-sum(val) OVER w AS sum,
-avg(val) OVER w AS avg,
-stddev(val) OVER w AS stddev,
-first_value(val) OVER w AS first_value,
-first_value(val, true) OVER w AS first_value_ignore_null,
-first_value(val, false) OVER w AS first_value_contain_null,
-last_value(val) OVER w AS last_value,
-last_value(val, true) OVER w AS last_value_ignore_null,
-last_value(val, false) OVER w AS last_value_contain_null,
+SELECT udf(val), cate,
+max(udf(val)) OVER w AS max,
+min(udf(val)) OVER w AS min,
+min(udf(val)) OVER w AS min,
+count(udf(val)) OVER w AS count,
+sum(udf(val)) OVER w AS sum,
+avg(udf(val)) OVER w AS avg,
+stddev(udf(val)) OVER w AS stddev,
+first_value(udf(val)) OVER w AS first_value,
+first_value(udf(val), true) OVER w AS first_value_ignore_null,
+first_value(udf(val), false) OVER w AS first_value_contain_null,
+last_value(udf(val)) OVER w AS last_value,
+last_value(udf(val), true) OVER w AS last_value_ignore_null,
+last_value(udf(val), false) OVER w AS last_value_contain_null,
 rank() OVER w AS rank,
 dense_rank() OVER w AS dense_rank,
 cume_dist() OVER w AS cume_dist,
 percent_rank() OVER w AS percent_rank,
 ntile(2) OVER w AS ntile,
 row_number() OVER w AS row_number,
-var_pop(val) OVER w AS var_pop,
-var_samp(val) OVER w AS var_samp,
-approx_count_distinct(val) OVER w AS approx_count_distinct,
-covar_pop(val, val_long) OVER w AS covar_pop,
-corr(val, val_long) OVER w AS corr,
-stddev_samp(val) OVER w AS stddev_samp,
-stddev_pop(val) OVER w AS stddev_pop,
-collect_list(val) OVER w AS collect_list,
-collect_set(val) OVER w AS collect_set,
-skewness(val_double) OVER w AS skewness,
-kurtosis(val_double) OVER w AS kurtosis
+var_pop(udf(val)) OVER w AS var_pop,
+var_samp(udf(val)) OVER w AS var_samp,
+approx_count_distinct(udf(val)) OVER w AS approx_count_distinct,
+covar_pop(udf(val), udf(val_long)) OVER w AS covar_pop,
+corr(udf(val), udf(val_long)) OVER w AS corr,
+stddev_samp(udf(val)) OVER w AS stddev_samp,
+stddev_pop(udf(val)) OVER w AS stddev_pop,
+collect_list(udf(val)) OVER w AS collect_list,
+collect_set(udf(val)) OVER w AS collect_set,
+skewness(udf(val_double)) OVER w AS skewness,
+kurtosis(udf(val_double)) OVER w AS kurtosis
 FROM testData
-WINDOW w AS (PARTITION BY cate ORDER BY val)
-ORDER BY cate, val
+WINDOW w AS (PARTITION BY udf(cate) ORDER BY udf(val))
+ORDER BY cate, udf(val)
 -- !query 17 schema
-struct<val:int,cate:string,max:int,min:int,min:int,count:bigint,sum:bigint,avg:double,stddev:double,first_value:int,first_value_ignore_null:int,first_value_contain_null:int,last_value:int,last_value_ignore_null:int,last_value_contain_null:int,rank:int,dense_rank:int,cume_dist:double,percent_rank:double,ntile:int,row_number:int,var_pop:double,var_samp:double,approx_count_distinct:bigint,covar_pop:double,corr:double,stddev_samp:double,stddev_pop:double,collect_list:array<int>,collect_set:array<int>,skewness:double,kurtosis:double>
+struct<CAST(udf(cast(val as string)) AS INT):int,cate:string,max:int,min:int,min:int,count:bigint,sum:bigint,avg:double,stddev:double,first_value:int,first_value_ignore_null:int,first_value_contain_null:int,last_value:int,last_value_ignore_null:int,last_value_contain_null:int,rank:int,dense_rank:int,cume_dist:double,percent_rank:double,ntile:int,row_number:int,var_pop:double,var_samp:double,approx_count_distinct:bigint,covar_pop:double,corr:double,stddev_samp:double,stddev_pop:double,collect_list:array<int>,collect_set:array<int>,skewness:double,kurtosis:double>
 -- !query 17 output
 NULL   NULL    NULL    NULL    NULL    0       NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    1       1       0.5     0.0     1       1       NULL    NULL    0       NULL    NULL
    NULL    NULL    []      []      NULL    NULL
 3      NULL    3       3       3       1       3       3.0     NaN     NULL    3       NULL    3       3       3       2       2       1.0     1.0     2       2       0.0     NaN     1       0.0     NaN
     NaN     0.0     [3]     [3]     NaN     NaN
 -300,9 +300,9  NULL        a       NULL    NULL    NULL    0       NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    1       1       0.25    0.

 -- !query 18
-SELECT val, cate, avg(null) OVER(PARTITION BY cate ORDER BY val) FROM testData ORDER BY cate, val
+SELECT udf(val), cate, avg(null) OVER(PARTITION BY cate ORDER BY val) FROM testData ORDER BY cate, val
 -- !query 18 schema
-struct<val:int,cate:string,avg(CAST(NULL AS DOUBLE)) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):double>
+struct<CAST(udf(cast(val as string)) AS INT):int,cate:string,avg(CAST(NULL AS DOUBLE)) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):double>
 -- !query 18 output
 NULL   NULL    NULL
 3      NULL    NULL
 -316,7 +316,7  NULL        a       NULL

 -- !query 19
-SELECT val, cate, row_number() OVER(PARTITION BY cate) FROM testData ORDER BY cate, val
+SELECT udf(val), cate, row_number() OVER(PARTITION BY cate) FROM testData ORDER BY cate, udf(val)
 -- !query 19 schema
 struct<>
 -- !query 19 output
 -325,9 +325,9  Window function row_number() requires window to be ordered, please add ORDER BY

 -- !query 20
-SELECT val, cate, sum(val) OVER(), avg(val) OVER() FROM testData ORDER BY cate, val
+SELECT udf(val), cate, sum(val) OVER(), avg(val) OVER() FROM testData ORDER BY cate, val
 -- !query 20 schema
-struct<val:int,cate:string,sum(CAST(val AS BIGINT)) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint,avg(CAST(val AS BIGINT)) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double>
+struct<CAST(udf(cast(val as string)) AS INT):int,cate:string,sum(CAST(val AS BIGINT)) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint,avg(CAST(val AS BIGINT)) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double>
 -- !query 20 output
 NULL   NULL    13      1.8571428571428572
 3      NULL    13      1.8571428571428572
 -341,7 +341,7  NULL        a       13      1.8571428571428572

 -- !query 21
-SELECT val, cate,
+SELECT udf(val), cate,
 first_value(false) OVER w AS first_value,
 first_value(true, true) OVER w AS first_value_ignore_null,
 first_value(false, false) OVER w AS first_value_contain_null,
 -352,7 +352,7  FROM testData
 WINDOW w AS ()
 ORDER BY cate, val
 -- !query 21 schema
-struct<val:int,cate:string,first_value:boolean,first_value_ignore_null:boolean,first_value_contain_null:boolean,last_value:boolean,last_value_ignore_null:boolean,last_value_contain_null:boolean>
+struct<CAST(udf(cast(val as string)) AS INT):int,cate:string,first_value:boolean,first_value_ignore_null:boolean,first_value_contain_null:boolean,last_value:boolean,last_value_ignore_null:boolean,last_value_contain_null:boolean>
 -- !query 21 output
 NULL   NULL    false   true    false   false   true    false
 3      NULL    false   true    false   false   true    false
 -366,12 +366,12  NULL      a       false   true    false   false   true    false

 -- !query 22
-SELECT cate, sum(val) OVER (w)
+SELECT udf(cate), sum(val) OVER (w)
 FROM testData
 WHERE val is not null
 WINDOW w AS (PARTITION BY cate ORDER BY val)
 -- !query 22 schema
-struct<cate:string,sum(CAST(val AS BIGINT)) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):bigint>
+struct<CAST(udf(cast(cate as string)) AS STRING):string,sum(CAST(val AS BIGINT)) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):bigint>
 -- !query 22 output
 NULL   3
 a      2
```

</p>
</details>

## How was this patch tested?
Tested as guided in [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921).

Closes #25195 from younggyuchun/master.

Authored-by: younggyu chun <younggyuchun@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-25 22:32:28 +09:00
Gengliang Wang b367b323d2 [SPARK-28497][SQL] Disallow upcasting complex data types to string type
## What changes were proposed in this pull request?

In the current implementation. complex types like Array/Map/StructType are allowed to upcast as StringType.
This is not safe casting. We should disallow it.

## How was this patch tested?

Update the existing test case

Closes #25242 from gengliangwang/fixUpCastStringType.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-25 20:55:01 +09:00
shivusondur 167fa0402d [SPARK-28390][SQL][PYTHON][TESTS] Convert and port 'pgSQL/select_having.sql' into UDF test base
## What changes were proposed in this pull request?
changed the test according to steps mentioned in SPARK-27921

<details>
<summary>difference comparing to select_having.sql</summary>
<p>

```diff
diff --git a/sql/core/src/test/resources/sql-tests/results/pgSQL/select_having.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-select_having.sql.out
index 02536eb..f731d11 100644
--- a/sql/core/src/test/resources/sql-tests/results/pgSQL/select_having.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-select_having.sql.out
 -91,54 +91,54  struct<>

 -- !query 11
-SELECT b, c FROM test_having
-	GROUP BY b, c HAVING count(*) = 1 ORDER BY b, c
+SELECT udf(b), udf(c) FROM test_having
+	GROUP BY b, c HAVING udf(count(*)) = 1 ORDER BY udf(b), udf(c)
 -- !query 11 schema
-struct<b:int,c:string>
+struct<CAST(udf(cast(b as string)) AS INT):int,CAST(udf(cast(c as string)) AS STRING):string>
 -- !query 11 output
 1	XXXX
 3	bbbb

 -- !query 12
-SELECT b, c FROM test_having
-	GROUP BY b, c HAVING b = 3 ORDER BY b, c
+SELECT udf(b), udf(c) FROM test_having
+	GROUP BY b, c HAVING udf(b) = 3 ORDER BY udf(b), udf(c)
 -- !query 12 schema
-struct<b:int,c:string>
+struct<CAST(udf(cast(b as string)) AS INT):int,CAST(udf(cast(c as string)) AS STRING):string>
 -- !query 12 output
 3	BBBB
 3	bbbb

 -- !query 13
-SELECT c, max(a) FROM test_having
-	GROUP BY c HAVING count(*) > 2 OR min(a) = max(a)
+SELECT udf(c), max(udf(a)) FROM test_having
+	GROUP BY c HAVING udf(count(*)) > 2 OR udf(min(a)) = udf(max(a))
 	ORDER BY c
 -- !query 13 schema
-struct<c:string,max(a):int>
+struct<CAST(udf(cast(c as string)) AS STRING):string,max(CAST(udf(cast(a as string)) AS INT)):int>
 -- !query 13 output
 XXXX	0
 bbbb	5

 -- !query 14
-SELECT min(a), max(a) FROM test_having HAVING min(a) = max(a)
+SELECT udf(udf(min(udf(a)))), udf(udf(max(udf(a)))) FROM test_having HAVING udf(udf(min(udf(a)))) = udf(udf(max(udf(a))))
 -- !query 14 schema
-struct<min(a):int,max(a):int>
+struct<CAST(udf(cast(cast(udf(cast(min(cast(udf(cast(a as string)) as int)) as string)) as int) as string)) AS INT):int,CAST(udf(cast(cast(udf(cast(max(cast(udf(cast(a as string)) as int)) as string)) as int) as string)) AS INT):int>
 -- !query 14 output

 -- !query 15
-SELECT min(a), max(a) FROM test_having HAVING min(a) < max(a)
+SELECT udf(min(udf(a))), udf(udf(max(a))) FROM test_having HAVING udf(min(a)) < udf(max(udf(a)))
 -- !query 15 schema
-struct<min(a):int,max(a):int>
+struct<CAST(udf(cast(min(cast(udf(cast(a as string)) as int)) as string)) AS INT):int,CAST(udf(cast(cast(udf(cast(max(a) as string)) as int) as string)) AS INT):int>
 -- !query 15 output
 0	9

 -- !query 16
-SELECT a FROM test_having HAVING min(a) < max(a)
+SELECT udf(a) FROM test_having HAVING udf(min(a)) < udf(max(a))
 -- !query 16 schema
 struct<>
 -- !query 16 output
 -147,16 +147,16  grouping expressions sequence is empty, and 'default.test_having.`a`' is not an

 -- !query 17
-SELECT 1 AS one FROM test_having HAVING a > 1
+SELECT 1 AS one FROM test_having HAVING udf(a) > 1
 -- !query 17 schema
 struct<>
 -- !query 17 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`a`' given input columns: [one]; line 1 pos 40
+cannot resolve '`a`' given input columns: [one]; line 1 pos 44

 -- !query 18
-SELECT 1 AS one FROM test_having HAVING 1 > 2
+SELECT 1 AS one FROM test_having HAVING udf(udf(1) > udf(2))
 -- !query 18 schema
 struct<one:int>
 -- !query 18 output
 -164,7 +164,7  struct<one:int>

 -- !query 19
-SELECT 1 AS one FROM test_having HAVING 1 < 2
+SELECT 1 AS one FROM test_having HAVING udf(udf(1) < udf(2))
 -- !query 19 schema
 struct<one:int>
 -- !query 19 output
 -172,7 +172,7  struct<one:int>

 -- !query 20
-SELECT 1 AS one FROM test_having WHERE 1/a = 1 HAVING 1 < 2
+SELECT 1 AS one FROM test_having WHERE 1/udf(a) = 1 HAVING 1 < 2
 -- !query 20 schema
 struct<one:int>
 -- !query 20 output
```
</p>
</details>

## How was this patch tested?
by:

```bash
sudo SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/test-only *SQLQueryTestSuite -- -z udf/pgSQL/udf-select_having.sql"
```

Closes #25161 from shivusondur/jira28390.

Authored-by: shivusondur <shivusondur@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-24 14:43:39 +09:00
Yuming Wang d67b98ea01 [SPARK-28435][SQL] Support accepting the interval keyword in the schema string
## What changes were proposed in this pull request?

https://github.com/apache/spark/pull/7355 add support casting between IntervalType and StringType for scala interface:
```scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions._

Cast(Literal("interval 3 month 1 hours"), CalendarIntervalType).eval()
res0: Any = interval 3 months 1 hours
```
But SQL interface does not support it:
```sql
scala> spark.sql("SELECT CAST('interval 3 month 1 hour' AS interval)").show
org.apache.spark.sql.catalyst.parser.ParseException:
DataType interval is not supported.(line 1, pos 41)

== SQL ==
SELECT CAST('interval 3 month 1 hour' AS interval)
-----------------------------------------^^^

  at org.apache.spark.sql.catalyst.parser.AstBuilder.$anonfun$visitPrimitiveDataType$1(AstBuilder.scala:1931)
  at org.apache.spark.sql.catalyst.parser.ParserUtils$.withOrigin(ParserUtils.scala:108)
  at org.apache.spark.sql.catalyst.parser.AstBuilder.visitPrimitiveDataType(AstBuilder.scala:1909)
  at org.apache.spark.sql.catalyst.parser.AstBuilder.visitPrimitiveDataType(AstBuilder.scala:52)
...
```

This PR add supports accepting the `interval` keyword in the schema string. So that SQL interface can support this feature.

## How was this patch tested?

unit tests

Closes #25189 from wangyum/SPARK-28435.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-23 19:40:57 -07:00
HyukjinKwon b83b7927b3 [SPARK-27234][SS][PYTHON] Use InheritableThreadLocal for current epoch in EpochTracker (to support Python UDFs)
## What changes were proposed in this pull request?

This PR proposes to use `InheritableThreadLocal` instead of `ThreadLocal` for current epoch in `EpochTracker`. Python UDF needs threads to write out to and read it from Python processes and when there are new threads, previously set epoch is lost.

After this PR, Python UDFs can be used at Structured Streaming with the continuous mode.

## How was this patch tested?

The test cases were written on the top of https://github.com/apache/spark/pull/24945.
Unit tests were added.

Manual tests.

Closes #24946 from HyukjinKwon/SPARK-27234.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-24 09:59:37 +09:00
Udbhav30 86dad404bd [SPARK-28391][SQL][PYTHON][TESTS] Convert and port 'pgSQL/select_implicit.sql' into UDF test base
## What changes were proposed in this pull request?
This PR adds some tests converted from 'pgSQL/select_implicit.sql' to test UDFs
<details><summary>Diff comparing to 'pgSQL/select_implicit.sql'</summary>
<p>

```diff
... diff --git a/sql/core/src/test/resources/sql-tests/results/pgSQL/select_implicit.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-select_implicit.sql.out
index 0675820..e6a5995 100755
--- a/sql/core/src/test/resources/sql-tests/results/pgSQL/select_implicit.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-select_implicit.sql.out
 -91,9 +91,11  struct<>

 -- !query 11
-SELECT c, count(*) FROM test_missing_target GROUP BY test_missing_target.c ORDER BY c
+SELECT udf(c), udf(count(*)) FROM test_missing_target GROUP BY
+test_missing_target.c
+ORDER BY udf(c)
 -- !query 11 schema
-struct<c:string,count(1):bigint>
+struct<CAST(udf(cast(c as string)) AS STRING):string,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
 -- !query 11 output
 ABAB	2
 BBBB	2
 -104,9 +106,10  cccc	2

 -- !query 12
-SELECT count(*) FROM test_missing_target GROUP BY test_missing_target.c ORDER BY c
+SELECT udf(count(*)) FROM test_missing_target GROUP BY test_missing_target.c
+ORDER BY udf(c)
 -- !query 12 schema
-struct<count(1):bigint>
+struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
 -- !query 12 output
 2
 2
 -117,18 +120,18  struct<count(1):bigint>

 -- !query 13
-SELECT count(*) FROM test_missing_target GROUP BY a ORDER BY b
+SELECT udf(count(*)) FROM test_missing_target GROUP BY a ORDER BY udf(b)
 -- !query 13 schema
 struct<>
 -- !query 13 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`b`' given input columns: [count(1)]; line 1 pos 61
+cannot resolve '`b`' given input columns: [CAST(udf(cast(count(1) as string)) AS BIGINT)]; line 1 pos 70

 -- !query 14
-SELECT count(*) FROM test_missing_target GROUP BY b ORDER BY b
+SELECT udf(count(*)) FROM test_missing_target GROUP BY b ORDER BY udf(b)
 -- !query 14 schema
-struct<count(1):bigint>
+struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
 -- !query 14 output
 1
 2
 -137,10 +140,10  struct<count(1):bigint>

 -- !query 15
-SELECT test_missing_target.b, count(*)
-  FROM test_missing_target GROUP BY b ORDER BY b
+SELECT udf(test_missing_target.b), udf(count(*))
+  FROM test_missing_target GROUP BY b ORDER BY udf(b)
 -- !query 15 schema
-struct<b:int,count(1):bigint>
+struct<CAST(udf(cast(b as string)) AS INT):int,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
 -- !query 15 output
 1	1
 2	2
 -149,9 +152,9  struct<b:int,count(1):bigint>

 -- !query 16
-SELECT c FROM test_missing_target ORDER BY a
+SELECT udf(c) FROM test_missing_target ORDER BY udf(a)
 -- !query 16 schema
-struct<c:string>
+struct<CAST(udf(cast(c as string)) AS STRING):string>
 -- !query 16 output
 XXXX
 ABAB
 -166,9 +169,9  CCCC

 -- !query 17
-SELECT count(*) FROM test_missing_target GROUP BY b ORDER BY b desc
+SELECT udf(count(*)) FROM test_missing_target GROUP BY b ORDER BY udf(b) desc
 -- !query 17 schema
-struct<count(1):bigint>
+struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
 -- !query 17 output
 4
 3
 -177,17 +180,17  struct<count(1):bigint>

 -- !query 18
-SELECT count(*) FROM test_missing_target ORDER BY 1 desc
+SELECT udf(count(*)) FROM test_missing_target ORDER BY udf(1) desc
 -- !query 18 schema
-struct<count(1):bigint>
+struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
 -- !query 18 output
 10

 -- !query 19
-SELECT c, count(*) FROM test_missing_target GROUP BY 1 ORDER BY 1
+SELECT udf(c), udf(count(*)) FROM test_missing_target GROUP BY 1 ORDER BY 1
 -- !query 19 schema
-struct<c:string,count(1):bigint>
+struct<CAST(udf(cast(c as string)) AS STRING):string,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
 -- !query 19 output
 ABAB	2
 BBBB	2
 -198,18 +201,18  cccc	2

 -- !query 20
-SELECT c, count(*) FROM test_missing_target GROUP BY 3
+SELECT udf(c), udf(count(*)) FROM test_missing_target GROUP BY 3
 -- !query 20 schema
 struct<>
 -- !query 20 output
 org.apache.spark.sql.AnalysisException
-GROUP BY position 3 is not in select list (valid range is [1, 2]); line 1 pos 53
+GROUP BY position 3 is not in select list (valid range is [1, 2]); line 1 pos 63

 -- !query 21
-SELECT count(*) FROM test_missing_target x, test_missing_target y
-	WHERE x.a = y.a
-	GROUP BY b ORDER BY b
+SELECT udf(count(*)) FROM test_missing_target x, test_missing_target y
+	WHERE udf(x.a) = udf(y.a)
+	GROUP BY b ORDER BY udf(b)
 -- !query 21 schema
 struct<>
 -- !query 21 output
 -218,10 +221,10  Reference 'b' is ambiguous, could be: x.b, y.b.; line 3 pos 10

 -- !query 22
-SELECT a, a FROM test_missing_target
-	ORDER BY a
+SELECT udf(a), udf(a) FROM test_missing_target
+	ORDER BY udf(a)
 -- !query 22 schema
-struct<a:int,a:int>
+struct<CAST(udf(cast(a as string)) AS INT):int,CAST(udf(cast(a as string)) AS INT):int>
 -- !query 22 output
 0	0
 1	1
 -236,10 +239,10  struct<a:int,a:int>

 -- !query 23
-SELECT a/2, a/2 FROM test_missing_target
-	ORDER BY a/2
+SELECT udf(udf(a)/2), udf(udf(a)/2) FROM test_missing_target
+	ORDER BY udf(udf(a)/2)
 -- !query 23 schema
-struct<(a div 2):int,(a div 2):int>
+struct<CAST(udf(cast((cast(udf(cast(a as string)) as int) div 2) as string)) AS INT):int,CAST(udf(cast((cast(udf(cast(a as string)) as int) div 2) as string)) AS INT):int>
 -- !query 23 output
 0	0
 0	0
 -254,10 +257,10  struct<(a div 2):int,(a div 2):int>

 -- !query 24
-SELECT a/2, a/2 FROM test_missing_target
-	GROUP BY a/2 ORDER BY a/2
+SELECT udf(a/2), udf(a/2) FROM test_missing_target
+	GROUP BY a/2 ORDER BY udf(a/2)
 -- !query 24 schema
-struct<(a div 2):int,(a div 2):int>
+struct<CAST(udf(cast((a div 2) as string)) AS INT):int,CAST(udf(cast((a div 2) as string)) AS INT):int>
 -- !query 24 output
 0	0
 1	1
 -267,11 +270,11  struct<(a div 2):int,(a div 2):int>

 -- !query 25
-SELECT x.b, count(*) FROM test_missing_target x, test_missing_target y
-	WHERE x.a = y.a
-	GROUP BY x.b ORDER BY x.b
+SELECT udf(x.b), udf(count(*)) FROM test_missing_target x, test_missing_target y
+	WHERE udf(x.a) = udf(y.a)
+	GROUP BY x.b ORDER BY udf(x.b)
 -- !query 25 schema
-struct<b:int,count(1):bigint>
+struct<CAST(udf(cast(b as string)) AS INT):int,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
 -- !query 25 output
 1	1
 2	2
 -280,11 +283,11  struct<b:int,count(1):bigint>

 -- !query 26
-SELECT count(*) FROM test_missing_target x, test_missing_target y
-	WHERE x.a = y.a
-	GROUP BY x.b ORDER BY x.b
+SELECT udf(count(*)) FROM test_missing_target x, test_missing_target y
+	WHERE udf(x.a) = udf(y.a)
+	GROUP BY x.b ORDER BY udf(x.b)
 -- !query 26 schema
-struct<count(1):bigint>
+struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint>
 -- !query 26 output
 1
 2
 -293,22 +296,22  struct<count(1):bigint>

 -- !query 27
-SELECT a%2, count(b) FROM test_missing_target
+SELECT a%2, udf(count(udf(b))) FROM test_missing_target
 GROUP BY test_missing_target.a%2
-ORDER BY test_missing_target.a%2
+ORDER BY udf(test_missing_target.a%2)
 -- !query 27 schema
-struct<(a % 2):int,count(b):bigint>
+struct<(a % 2):int,CAST(udf(cast(count(cast(udf(cast(b as string)) as int)) as string)) AS BIGINT):bigint>
 -- !query 27 output
 0	5
 1	5

 -- !query 28
-SELECT count(c) FROM test_missing_target
+SELECT udf(count(c)) FROM test_missing_target
 GROUP BY lower(test_missing_target.c)
-ORDER BY lower(test_missing_target.c)
+ORDER BY udf(lower(test_missing_target.c))
 -- !query 28 schema
-struct<count(c):bigint>
+struct<CAST(udf(cast(count(c) as string)) AS BIGINT):bigint>
 -- !query 28 output
 2
 3
 -317,18 +320,18  struct<count(c):bigint>

 -- !query 29
-SELECT count(a) FROM test_missing_target GROUP BY a ORDER BY b
+SELECT udf(count(udf(a))) FROM test_missing_target GROUP BY a ORDER BY udf(b)
 -- !query 29 schema
 struct<>
 -- !query 29 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`b`' given input columns: [count(a)]; line 1 pos 61
+cannot resolve '`b`' given input columns: [CAST(udf(cast(count(cast(udf(cast(a as string)) as int)) as string)) AS BIGINT)]; line 1 pos 75

 -- !query 30
-SELECT count(b) FROM test_missing_target GROUP BY b/2 ORDER BY b/2
+SELECT udf(count(b)) FROM test_missing_target GROUP BY b/2 ORDER BY udf(b/2)
 -- !query 30 schema
-struct<count(b):bigint>
+struct<CAST(udf(cast(count(b) as string)) AS BIGINT):bigint>
 -- !query 30 output
 1
 5
 -336,10 +339,10  struct<count(b):bigint>

 -- !query 31
-SELECT lower(test_missing_target.c), count(c)
-  FROM test_missing_target GROUP BY lower(c) ORDER BY lower(c)
+SELECT udf(lower(test_missing_target.c)), udf(count(udf(c)))
+  FROM test_missing_target GROUP BY lower(c) ORDER BY udf(lower(c))
 -- !query 31 schema
-struct<lower(c):string,count(c):bigint>
+struct<CAST(udf(cast(lower(c) as string)) AS STRING):string,CAST(udf(cast(count(cast(udf(cast(c as string)) as string)) as string)) AS BIGINT):bigint>
 -- !query 31 output
 abab	2
 bbbb	3
 -348,9 +351,9  xxxx	1

 -- !query 32
-SELECT a FROM test_missing_target ORDER BY upper(d)
+SELECT udf(a) FROM test_missing_target ORDER BY udf(upper(udf(d)))
 -- !query 32 schema
-struct<a:int>
+struct<CAST(udf(cast(a as string)) AS INT):int>
 -- !query 32 output
 0
 1
 -365,19 +368,19  struct<a:int>

 -- !query 33
-SELECT count(b) FROM test_missing_target
-	GROUP BY (b + 1) / 2 ORDER BY (b + 1) / 2 desc
+SELECT udf(count(b)) FROM test_missing_target
+	GROUP BY (b + 1) / 2 ORDER BY udf((b + 1) / 2) desc
 -- !query 33 schema
-struct<count(b):bigint>
+struct<CAST(udf(cast(count(b) as string)) AS BIGINT):bigint>
 -- !query 33 output
 7
 3

 -- !query 34
-SELECT count(x.a) FROM test_missing_target x, test_missing_target y
-	WHERE x.a = y.a
-	GROUP BY b/2 ORDER BY b/2
+SELECT udf(count(udf(x.a))) FROM test_missing_target x, test_missing_target y
+	WHERE udf(x.a) = udf(y.a)
+	GROUP BY b/2 ORDER BY udf(b/2)
 -- !query 34 schema
 struct<>
 -- !query 34 output
 -386,11 +389,12  Reference 'b' is ambiguous, could be: x.b, y.b.; line 3 pos 10

 -- !query 35
-SELECT x.b/2, count(x.b) FROM test_missing_target x, test_missing_target y
-	WHERE x.a = y.a
-	GROUP BY x.b/2 ORDER BY x.b/2
+SELECT udf(x.b/2), udf(count(udf(x.b))) FROM test_missing_target x,
+test_missing_target y
+	WHERE udf(x.a) = udf(y.a)
+	GROUP BY x.b/2 ORDER BY udf(x.b/2)
 -- !query 35 schema
-struct<(b div 2):int,count(b):bigint>
+struct<CAST(udf(cast((b div 2) as string)) AS INT):int,CAST(udf(cast(count(cast(udf(cast(b as string)) as int)) as string)) AS BIGINT):bigint>
 -- !query 35 output
 0	1
 1	5
 -398,14 +402,14  struct<(b div 2):int,count(b):bigint>

 -- !query 36
-SELECT count(b) FROM test_missing_target x, test_missing_target y
-	WHERE x.a = y.a
+SELECT udf(count(udf(b))) FROM test_missing_target x, test_missing_target y
+	WHERE udf(x.a) = udf(y.a)
 	GROUP BY x.b/2
 -- !query 36 schema
 struct<>
 -- !query 36 output
 org.apache.spark.sql.AnalysisException
-Reference 'b' is ambiguous, could be: x.b, y.b.; line 1 pos 13
+Reference 'b' is ambiguous, could be: x.b, y.b.; line 1 pos 21

 -- !query 37
```

</p>
</details>

## How was this patch tested?
Tested as Guided in SPARK-27921

Closes #25233 from Udbhav30/master.

Authored-by: Udbhav30 <u.agrawal30@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-24 09:47:08 +09:00