## What changes were proposed in this pull request?
Adaptive execution reduces the number of post-shuffle partitions at runtime, even for shuffles caused by repartition. However, the user likely wants to get the desired number of partition when he calls repartition even in adaptive execution. This PR adds an internal config to control this and by default adaptive execution will not change the number of post-shuffle partition for repartition.
## How was this patch tested?
New tests added.
Closes#25121 from carsonwang/AE_repartition.
Authored-by: Carson Wang <carson.wang@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
When reordering joins EnsureRequirements only checks if all the join keys are present in the partitioning expression seq. This is problematic when the joins keys and and partitioning expressions both contain duplicates but not the same number of duplicates for each expression, e.g. `Seq(a, a, b)` vs `Seq(a, b, b)`. This fails with an index lookup failure in the `reorder` function.
This PR fixes this removing the equality checking logic from the `reorderJoinKeys` function, and by doing the multiset equality in the `reorder` function while building the reordered key sequences.
## How was this patch tested?
Added a unit test to the `PlannerSuite` and added an integration test to `JoinSuite`
Closes#25167 from hvanhovell/SPARK-27485.
Authored-by: herman <herman@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
A `Filter` predicate using `PythonUDF` can't be push down into join condition, currently. A predicate like that should be able to push down to join condition. For `PythonUDF`s that can't be evaluated in join condition, `PullOutPythonUDFInJoinCondition` will pull them out later.
An example like:
```scala
val pythonTestUDF = TestPythonUDF(name = "udf")
val left = Seq((1, 2), (2, 3)).toDF("a", "b")
val right = Seq((1, 2), (3, 4)).toDF("c", "d")
val df = left.crossJoin(right).where(pythonTestUDF($"a") === pythonTestUDF($"c"))
```
Query plan before the PR:
```
== Physical Plan ==
*(3) Project [a#2121, b#2122, c#2132, d#2133]
+- *(3) Filter (pythonUDF0#2142 = pythonUDF1#2143)
+- BatchEvalPython [udf(a#2121), udf(c#2132)], [pythonUDF0#2142, pythonUDF1#2143]
+- BroadcastNestedLoopJoin BuildRight, Cross
:- *(1) Project [_1#2116 AS a#2121, _2#2117 AS b#2122]
: +- LocalTableScan [_1#2116, _2#2117]
+- BroadcastExchange IdentityBroadcastMode
+- *(2) Project [_1#2127 AS c#2132, _2#2128 AS d#2133]
+- LocalTableScan [_1#2127, _2#2128]
```
Query plan after the PR:
```
== Physical Plan ==
*(3) Project [a#2121, b#2122, c#2132, d#2133]
+- *(3) BroadcastHashJoin [pythonUDF0#2142], [pythonUDF0#2143], Cross, BuildRight
:- BatchEvalPython [udf(a#2121)], [pythonUDF0#2142]
: +- *(1) Project [_1#2116 AS a#2121, _2#2117 AS b#2122]
: +- LocalTableScan [_1#2116, _2#2117]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[2, string, true]))
+- BatchEvalPython [udf(c#2132)], [pythonUDF0#2143]
+- *(2) Project [_1#2127 AS c#2132, _2#2128 AS d#2133]
+- LocalTableScan [_1#2127, _2#2128]
```
After this PR, the join can use `BroadcastHashJoin`, instead of `BroadcastNestedLoopJoin`.
## How was this patch tested?
Added tests.
Closes#25106 from viirya/pythonudf-join-condition.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This PR aims to correct mappings in `MsSqlServerDialect`. `ShortType` is mapped to `SMALLINT` and `FloatType` is mapped to `REAL` per [JBDC mapping]( https://docs.microsoft.com/en-us/sql/connect/jdbc/using-basic-data-types?view=sql-server-2017) respectively.
ShortType and FloatTypes are not correctly mapped to right JDBC types when using JDBC connector. This results in tables and spark data frame being created with unintended types. The issue was observed when validating against SQLServer.
Refer [JBDC mapping]( https://docs.microsoft.com/en-us/sql/connect/jdbc/using-basic-data-types?view=sql-server-2017 ) for guidance on mappings between SQLServer, JDBC and Java. Note that java "Short" type should be mapped to JDBC "SMALLINT" and java Float should be mapped to JDBC "REAL".
Some example issue that can happen because of wrong mappings
- Write from df with column type results in a SQL table of with column type as INTEGER as opposed to SMALLINT.Thus a larger table that expected.
- Read results in a dataframe with type INTEGER as opposed to ShortType
- ShortType has a problem in both the the write and read path
- FloatTypes only have an issue with read path. In the write path Spark data type 'FloatType' is correctly mapped to JDBC equivalent data type 'Real'. But in the read path when JDBC data types need to be converted to Catalyst data types ( getCatalystType) 'Real' gets incorrectly gets mapped to 'DoubleType' rather than 'FloatType'.
Refer #28151 which contained this fix as one part of a larger PR. Following PR #28151 discussion it was decided to file seperate PRs for each of the fixes.
## How was this patch tested?
UnitTest added in JDBCSuite.scala and these were tested.
Integration test updated and passed in MsSqlServerDialect.scala
E2E test done with SQLServer
Closes#25146 from shivsood/float_short_type_fix.
Authored-by: shivsood <shivsood@microsoft.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
`System.currentTimeMillis` read two times in a loop in `RateStreamContinuousPartitionReader`. If the test machine is slow enough and it spends quite some time between the `while` condition check and the `Thread.sleep` then the timeout value is negative and throws `IllegalArgumentException`.
In this PR I've fixed this issue.
## How was this patch tested?
Existing unit tests.
Closes#25162 from gaborgsomogyi/SPARK-28404.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
In the PR, I propose to use the `plusMonths()` method of `LocalDate` to add months to a date. This method adds the specified amount to the months field of `LocalDate` in three steps:
1. Add the input months to the month-of-year field
2. Check if the resulting date would be invalid
3. Adjust the day-of-month to the last valid day if necessary
The difference between current behavior and propose one is in handling the last day of month in the original date. For example, adding 1 month to `2019-02-28` will produce `2019-03-28` comparing to the current implementation where the result is `2019-03-31`.
The proposed behavior is implemented in MySQL and PostgreSQL.
## How was this patch tested?
By existing test suites `DateExpressionsSuite`, `DateFunctionsSuite` and `DateTimeUtilsSuite`.
Closes#25153 from MaxGekk/add-months.
Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This PR adds some traits so that we can deduplicate initialization stuff for each type of test case. For instance, see [SPARK-28343](https://issues.apache.org/jira/browse/SPARK-28343).
It's a little bit overkill but I think it will make adding test cases easier and cause less confusions.
This PR adds both:
```
private trait PgSQLTest
private trait UDFTest
```
To indicate and share the logics related to each combination of test types.
## How was this patch tested?
Manually tested.
Closes#25155 from HyukjinKwon/SPARK-28392.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This upgraded to a newer version of Pyrolite. Most updates [1] in the newer version are for dotnot. For java, it includes a bug fix to Unpickler regarding cleaning up Unpickler memo, and support of protocol 5.
After upgrading, we can remove the fix at SPARK-27629 for the bug in Unpickler.
[1] https://github.com/irmen/Pyrolite/compare/pyrolite-4.23...master
## How was this patch tested?
Manually tested on Python 3.6 in local on existing tests.
Closes#25143 from viirya/upgrade-pyrolite.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This patch proposes moving all Trigger implementations to `Triggers.scala`, to avoid exposing these implementations to the end users and let end users only deal with `Trigger.xxx` static methods. This fits the intention of deprecation of `ProcessingTIme`, and we agree to move others without deprecation as this patch will be shipped in major version (Spark 3.0.0).
## How was this patch tested?
UTs modified to work with newly introduced class.
Closes#24996 from HeartSaVioR/SPARK-28199.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This pr enables `spark.sql.crossJoin.enabled` and `spark.sql.parser.ansi.enabled` for PostgreSQL test.
## How was this patch tested?
manual tests:
Run `test.sql` in [pgSQL](https://github.com/apache/spark/tree/master/sql/core/src/test/resources/sql-tests/inputs/pgSQL) directory and in [inputs](https://github.com/apache/spark/tree/master/sql/core/src/test/resources/sql-tests/inputs) directory:
```sql
cat <<EOF > test.sql
create or replace temporary view t1 as
select * from (values(1), (2)) as v (val);
create or replace temporary view t2 as
select * from (values(2), (1)) as v (val);
select t1.*, t2.* from t1 join t2;
EOF
```
Closes#25109 from wangyum/SPARK-28343.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Parquet may call the filter with a null value to check whether nulls are
accepted. While it seems Spark avoids that path in Parquet with 1.10, in
1.11 that causes Spark unit tests to fail.
Tested with Parquet 1.11 (and new unit test).
Closes#25140 from vanzin/SPARK-28371.
Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
This patch fixes the flaky test "query without test harness" on ContinuousSuite, via adding some more gaps on waiting query to commit the epoch which writes output rows.
The observation of this issue is below (injected some debug logs to get them):
```
reader creation time 1562225320210
epoch 1 launched 1562225320593 (+380ms from reader creation time)
epoch 13 launched 1562225321702 (+1.5s from reader creation time)
partition reader creation time 1562225321715 (+1.5s from reader creation time)
next read time for first next call 1562225321210 (+1s from reader creation time)
first next called in partition reader 1562225321746 (immediately after creation of partition reader)
wait finished in next called in partition reader 1562225321746 (no wait)
second next called in partition reader 1562225321747 (immediately after first next())
epoch 0 commit started 1562225321861
writing rows (0, 1) (belong to epoch 13) 1562225321866 (+100ms after first next())
wait start in waitForRateSourceTriggers(2) 1562225322059
next read time for second next call 1562225322210 (+1s from previous "next read time")
wait finished in next called in partition reader 1562225322211 (+450ms wait)
writing rows (2, 3) (belong to epoch 13) 1562225322211 (immediately after next())
epoch 14 launched 1562225322246
desired wait time in waitForRateSourceTriggers(2) 1562225322510 (+2.3s from reader creation time)
epoch 12 committed 1562225323034
```
These rows were written within desired wait time, but the epoch 13 couldn't be committed within it. Interestingly, epoch 12 was lucky to be committed within a gap between finished waiting in waitForRateSourceTriggers and query.stop() - but even suppose the rows were written in epoch 12, it would be just in luck and epoch should be committed within desired wait time.
This patch modifies Rate continuous stream to track the highest committed value, so that test can wait until desired value is reported to the stream as committed.
This patch also modifies Rate continuous stream to track the timestamp at stream gets the first committed offset, and let `waitForRateSourceTriggers` use the timestamp. This also relies on waiting for specific period, but safer approach compared to current based on the observation above. Based on the change, this patch saves couple of seconds in test time.
## How was this patch tested?
10 sequential test runs succeeded locally.
Closes#25048 from HeartSaVioR/SPARK-28247.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
A code gen test in WholeStageCodeGenSuite was flaky because it used the codegen metrics class to test if the generated code for equivalent plans was identical under a particular flag. This patch switches the test to compare the generated code directly.
N/A
Closes#25131 from gatorsmile/WholeStageCodegenSuite.
Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
This PR adds compatibility of handling a `WITH` clause within another `WITH` cause. Before this PR these queries retuned `1` while after this PR they return `2` as PostgreSQL does:
```
WITH
t AS (SELECT 1),
t2 AS (
WITH t AS (SELECT 2)
SELECT * FROM t
)
SELECT * FROM t2
```
```
WITH t AS (SELECT 1)
SELECT (
WITH t AS (SELECT 2)
SELECT * FROM t
)
```
As this is an incompatible change, the PR introduces the `spark.sql.legacy.cte.substitution.enabled` flag as an option to restore old behaviour.
## How was this patch tested?
Added new UTs.
Closes#25029 from peter-toth/SPARK-28228.
Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
There are some hardcoded configs, using config entry to replace them.
## How was this patch tested?
Existing UT
Closes#25059 from WangGuangxin/ConfigEntry.
Authored-by: wangguangxin.cn <wangguangxin.cn@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
0-args Java UDF alone calls the function even before making it as an expression.
It causes that the function always returns the same value and the function is called at driver side.
Seems like a mistake.
## How was this patch tested?
Unit test was added
Closes#25108 from HyukjinKwon/SPARK-28321.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Implement `ALTER TABLE` for v2 tables:
* Add `AlterTable` logical plan and `AlterTableExec` physical plan
* Convert `ALTER TABLE` parsed plans to `AlterTable` when a v2 catalog is responsible for an identifier
* Validate that columns to alter exist in analyzer checks
* Fix nested type handling in `CatalogV2Util`
## How was this patch tested?
* Add extensive tests in `DataSourceV2SQLSuite`
Closes#24937 from rdblue/SPARK-28139-add-v2-alter-table.
Lead-authored-by: Ryan Blue <blue@apache.org>
Co-authored-by: Ryan Blue <rdblue@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Cleaned up (removed) code duplication in `ObjectProducerExec` operators so they use the trait's methods.
## How was this patch tested?
Local build. Waiting for Jenkins.
Closes#25065 from jaceklaskowski/ObjectProducerExec-operators-cleanup.
Authored-by: Jacek Laskowski <jacek@japila.pl>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This is a second part of the https://issues.apache.org/jira/browse/SPARK-27396 and a follow on to #24795
## How was this patch tested?
I did some manual tests and ran/updated the automated tests
I did some simple performance tests on a single node to try to verify that there is no performance impact, and I was not able to measure anything beyond noise.
Closes#25008 from revans2/columnar-remove-batch-scan.
Authored-by: Robert (Bobby) Evans <bobby@apache.org>
Signed-off-by: Thomas Graves <tgraves@apache.org>
## What changes were proposed in this pull request?
The tests added at https://github.com/apache/spark/pull/25069 seem flaky in some environments. See https://github.com/apache/spark/pull/25069#issuecomment-510338469
Python's string representation of floats can make the tests flaky. See https://docs.python.org/3/tutorial/floatingpoint.html.
I think it's just better to explicitly cast everywhere udf returns a float (or a double) to stay safe. (note that we're not targeting the Python <> Scala value conversions - there are inevitable differences between Python and Scala; therefore, other languages' UDFs cannot guarantee the same results between Python and Scala).
This PR proposes to cast cases to long, integer and decimal explicitly to make the test cases robust.
<details><summary>Diff comparing to 'pgSQL/aggregates_part1.sql'</summary>
<p>
```diff
diff --git a/sql/core/src/test/resources/sql-tests/results/pgSQL/aggregates_part1.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out
index 51ca1d55869..734634b7388 100644
--- a/sql/core/src/test/resources/sql-tests/results/pgSQL/aggregates_part1.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out
-3,23 +3,23
-- !query 0
-SELECT avg(four) AS avg_1 FROM onek
+SELECT CAST(avg(udf(four)) AS decimal(10,3)) AS avg_1 FROM onek
-- !query 0 schema
-struct<avg_1:double>
+struct<avg_1:decimal(10,3)>
-- !query 0 output
1.5
-- !query 1
-SELECT avg(a) AS avg_32 FROM aggtest WHERE a < 100
+SELECT CAST(udf(avg(a)) AS decimal(10,3)) AS avg_32 FROM aggtest WHERE a < 100
-- !query 1 schema
-struct<avg_32:double>
+struct<avg_32:decimal(10,3)>
-- !query 1 output
-32.666666666666664
+32.667
-- !query 2
-select CAST(avg(b) AS Decimal(10,3)) AS avg_107_943 FROM aggtest
+select CAST(avg(udf(b)) AS Decimal(10,3)) AS avg_107_943 FROM aggtest
-- !query 2 schema
struct<avg_107_943:decimal(10,3)>
-- !query 2 output
-27,39 +27,39 struct<avg_107_943:decimal(10,3)>
-- !query 3
-SELECT sum(four) AS sum_1500 FROM onek
+SELECT CAST(sum(udf(four)) AS int) AS sum_1500 FROM onek
-- !query 3 schema
-struct<sum_1500:bigint>
+struct<sum_1500:int>
-- !query 3 output
1500
-- !query 4
-SELECT sum(a) AS sum_198 FROM aggtest
+SELECT udf(sum(a)) AS sum_198 FROM aggtest
-- !query 4 schema
-struct<sum_198:bigint>
+struct<sum_198:string>
-- !query 4 output
198
-- !query 5
-SELECT sum(b) AS avg_431_773 FROM aggtest
+SELECT CAST(udf(udf(sum(b))) AS decimal(10,3)) AS avg_431_773 FROM aggtest
-- !query 5 schema
-struct<avg_431_773:double>
+struct<avg_431_773:decimal(10,3)>
-- !query 5 output
-431.77260909229517
+431.773
-- !query 6
-SELECT max(four) AS max_3 FROM onek
+SELECT udf(max(four)) AS max_3 FROM onek
-- !query 6 schema
-struct<max_3:int>
+struct<max_3:string>
-- !query 6 output
3
-- !query 7
-SELECT max(a) AS max_100 FROM aggtest
+SELECT max(CAST(udf(a) AS int)) AS max_100 FROM aggtest
-- !query 7 schema
struct<max_100:int>
-- !query 7 output
-67,245 +67,246 struct<max_100:int>
-- !query 8
-SELECT max(aggtest.b) AS max_324_78 FROM aggtest
+SELECT CAST(udf(udf(max(aggtest.b))) AS decimal(10,3)) AS max_324_78 FROM aggtest
-- !query 8 schema
-struct<max_324_78:float>
+struct<max_324_78:decimal(10,3)>
-- !query 8 output
324.78
-- !query 9
-SELECT stddev_pop(b) FROM aggtest
+SELECT CAST(stddev_pop(udf(b)) AS decimal(10,3)) FROM aggtest
-- !query 9 schema
-struct<stddev_pop(CAST(b AS DOUBLE)):double>
+struct<CAST(stddev_pop(CAST(udf(b) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 9 output
-131.10703231895047
+131.107
-- !query 10
-SELECT stddev_samp(b) FROM aggtest
+SELECT CAST(udf(stddev_samp(b)) AS decimal(10,3)) FROM aggtest
-- !query 10 schema
-struct<stddev_samp(CAST(b AS DOUBLE)):double>
+struct<CAST(udf(stddev_samp(cast(b as double))) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 10 output
-151.38936080399804
+151.389
-- !query 11
-SELECT var_pop(b) FROM aggtest
+SELECT CAST(var_pop(udf(b)) AS decimal(10,3)) FROM aggtest
-- !query 11 schema
-struct<var_pop(CAST(b AS DOUBLE)):double>
+struct<CAST(var_pop(CAST(udf(b) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 11 output
-17189.053923482323
+17189.054
-- !query 12
-SELECT var_samp(b) FROM aggtest
+SELECT CAST(udf(var_samp(b)) AS decimal(10,3)) FROM aggtest
-- !query 12 schema
-struct<var_samp(CAST(b AS DOUBLE)):double>
+struct<CAST(udf(var_samp(cast(b as double))) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 12 output
-22918.738564643096
+22918.739
-- !query 13
-SELECT stddev_pop(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT CAST(udf(stddev_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest
-- !query 13 schema
-struct<stddev_pop(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<CAST(udf(stddev_pop(cast(cast(b as decimal(38,0)) as double))) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 13 output
-131.18117242958306
+131.181
-- !query 14
-SELECT stddev_samp(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT CAST(stddev_samp(CAST(udf(b) AS Decimal(38,0))) AS decimal(10,3)) FROM aggtest
-- !query 14 schema
-struct<stddev_samp(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<CAST(stddev_samp(CAST(CAST(udf(b) AS DECIMAL(38,0)) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 14 output
-151.47497042966097
+151.475
-- !query 15
-SELECT var_pop(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT CAST(udf(var_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest
-- !query 15 schema
-struct<var_pop(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<CAST(udf(var_pop(cast(cast(b as decimal(38,0)) as double))) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 15 output
17208.5
-- !query 16
-SELECT var_samp(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT CAST(var_samp(udf(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest
-- !query 16 schema
-struct<var_samp(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<CAST(var_samp(CAST(udf(cast(b as decimal(38,0))) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 16 output
-22944.666666666668
+22944.667
-- !query 17
-SELECT var_pop(1.0), var_samp(2.0)
+SELECT CAST(udf(var_pop(1.0)) AS int), var_samp(udf(2.0))
-- !query 17 schema
-struct<var_pop(CAST(1.0 AS DOUBLE)):double,var_samp(CAST(2.0 AS DOUBLE)):double>
+struct<CAST(udf(var_pop(cast(1.0 as double))) AS INT):int,var_samp(CAST(udf(2.0) AS DOUBLE)):double>
-- !query 17 output
-0.0 NaN
+0 NaN
-- !query 18
-SELECT stddev_pop(CAST(3.0 AS Decimal(38,0))), stddev_samp(CAST(4.0 AS Decimal(38,0)))
+SELECT CAST(stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))) AS int), stddev_samp(CAST(udf(4.0) AS Decimal(38,0)))
-- !query 18 schema
-struct<stddev_pop(CAST(CAST(3.0 AS DECIMAL(38,0)) AS DOUBLE)):double,stddev_samp(CAST(CAST(4.0 AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<CAST(stddev_pop(CAST(udf(cast(3.0 as decimal(38,0))) AS DOUBLE)) AS INT):int,stddev_samp(CAST(CAST(udf(4.0) AS DECIMAL(38,0)) AS DOUBLE)):double>
-- !query 18 output
-0.0 NaN
+0 NaN
-- !query 19
-select sum(CAST(null AS int)) from range(1,4)
+select sum(udf(CAST(null AS int))) from range(1,4)
-- !query 19 schema
-struct<sum(CAST(NULL AS INT)):bigint>
+struct<sum(CAST(udf(cast(null as int)) AS DOUBLE)):double>
-- !query 19 output
NULL
-- !query 20
-select sum(CAST(null AS long)) from range(1,4)
+select sum(udf(CAST(null AS long))) from range(1,4)
-- !query 20 schema
-struct<sum(CAST(NULL AS BIGINT)):bigint>
+struct<sum(CAST(udf(cast(null as bigint)) AS DOUBLE)):double>
-- !query 20 output
NULL
-- !query 21
-select sum(CAST(null AS Decimal(38,0))) from range(1,4)
+select sum(udf(CAST(null AS Decimal(38,0)))) from range(1,4)
-- !query 21 schema
-struct<sum(CAST(NULL AS DECIMAL(38,0))):decimal(38,0)>
+struct<sum(CAST(udf(cast(null as decimal(38,0))) AS DOUBLE)):double>
-- !query 21 output
NULL
-- !query 22
-select sum(CAST(null AS DOUBLE)) from range(1,4)
+select sum(udf(CAST(null AS DOUBLE))) from range(1,4)
-- !query 22 schema
-struct<sum(CAST(NULL AS DOUBLE)):double>
+struct<sum(CAST(udf(cast(null as double)) AS DOUBLE)):double>
-- !query 22 output
NULL
-- !query 23
-select avg(CAST(null AS int)) from range(1,4)
+select avg(udf(CAST(null AS int))) from range(1,4)
-- !query 23 schema
-struct<avg(CAST(NULL AS INT)):double>
+struct<avg(CAST(udf(cast(null as int)) AS DOUBLE)):double>
-- !query 23 output
NULL
-- !query 24
-select avg(CAST(null AS long)) from range(1,4)
+select avg(udf(CAST(null AS long))) from range(1,4)
-- !query 24 schema
-struct<avg(CAST(NULL AS BIGINT)):double>
+struct<avg(CAST(udf(cast(null as bigint)) AS DOUBLE)):double>
-- !query 24 output
NULL
-- !query 25
-select avg(CAST(null AS Decimal(38,0))) from range(1,4)
+select avg(udf(CAST(null AS Decimal(38,0)))) from range(1,4)
-- !query 25 schema
-struct<avg(CAST(NULL AS DECIMAL(38,0))):decimal(38,4)>
+struct<avg(CAST(udf(cast(null as decimal(38,0))) AS DOUBLE)):double>
-- !query 25 output
NULL
-- !query 26
-select avg(CAST(null AS DOUBLE)) from range(1,4)
+select avg(udf(CAST(null AS DOUBLE))) from range(1,4)
-- !query 26 schema
-struct<avg(CAST(NULL AS DOUBLE)):double>
+struct<avg(CAST(udf(cast(null as double)) AS DOUBLE)):double>
-- !query 26 output
NULL
-- !query 27
-select sum(CAST('NaN' AS DOUBLE)) from range(1,4)
+select sum(CAST(udf('NaN') AS DOUBLE)) from range(1,4)
-- !query 27 schema
-struct<sum(CAST(NaN AS DOUBLE)):double>
+struct<sum(CAST(udf(NaN) AS DOUBLE)):double>
-- !query 27 output
NaN
-- !query 28
-select avg(CAST('NaN' AS DOUBLE)) from range(1,4)
+select avg(CAST(udf('NaN') AS DOUBLE)) from range(1,4)
-- !query 28 schema
-struct<avg(CAST(NaN AS DOUBLE)):double>
+struct<avg(CAST(udf(NaN) AS DOUBLE)):double>
-- !query 28 output
NaN
-- !query 30
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
FROM (VALUES ('Infinity'), ('1')) v(x)
-- !query 30 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS DOUBLE)):double>
-- !query 30 output
Infinity NaN
-- !query 31
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
FROM (VALUES ('Infinity'), ('Infinity')) v(x)
-- !query 31 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS DOUBLE)):double>
-- !query 31 output
Infinity NaN
-- !query 32
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
FROM (VALUES ('-Infinity'), ('Infinity')) v(x)
-- !query 32 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS DOUBLE)):double>
-- !query 32 output
NaN NaN
-- !query 33
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS int), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3))
FROM (VALUES (100000003), (100000004), (100000006), (100000007)) v(x)
-- !query 33 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<CAST(avg(CAST(udf(cast(x as double)) AS DOUBLE)) AS INT):int,CAST(udf(var_pop(cast(x as double))) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 33 output
-1.00000005E8 2.5
+100000005 2.5
-- !query 34
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS long), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3))
FROM (VALUES (7000000000005), (7000000000007)) v(x)
-- !query 34 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<CAST(avg(CAST(udf(cast(x as double)) AS DOUBLE)) AS BIGINT):bigint,CAST(udf(var_pop(cast(x as double))) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 34 output
-7.000000000006E12 1.0
+7000000000006 1
-- !query 35
-SELECT covar_pop(b, a), covar_samp(b, a) FROM aggtest
+SELECT CAST(udf(covar_pop(b, udf(a))) AS decimal(10,3)), CAST(covar_samp(udf(b), a) as decimal(10,3)) FROM aggtest
-- !query 35 schema
-struct<covar_pop(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double,covar_samp(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double>
+struct<CAST(udf(covar_pop(cast(b as double), cast(udf(a) as double))) AS DECIMAL(10,3)):decimal(10,3),CAST(covar_samp(CAST(udf(b) AS DOUBLE), CAST(a AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 35 output
-653.6289553875104 871.5052738500139
+653.629 871.505
-- !query 36
-SELECT corr(b, a) FROM aggtest
+SELECT CAST(corr(b, udf(a)) AS decimal(10,3)) FROM aggtest
-- !query 36 schema
-struct<corr(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double>
+struct<CAST(corr(CAST(b AS DOUBLE), CAST(udf(a) AS DOUBLE)) AS DECIMAL(10,3)):decimal(10,3)>
-- !query 36 output
-0.1396345165178734
+0.14
-- !query 37
-SELECT count(four) AS cnt_1000 FROM onek
+SELECT count(udf(four)) AS cnt_1000 FROM onek
-- !query 37 schema
struct<cnt_1000:bigint>
-- !query 37 output
-313,18 +314,18 struct<cnt_1000:bigint>
-- !query 38
-SELECT count(DISTINCT four) AS cnt_4 FROM onek
+SELECT udf(count(DISTINCT four)) AS cnt_4 FROM onek
-- !query 38 schema
-struct<cnt_4:bigint>
+struct<cnt_4:string>
-- !query 38 output
4
-- !query 39
-select ten, count(*), sum(four) from onek
+select ten, udf(count(*)), CAST(sum(udf(four)) AS int) from onek
group by ten order by ten
-- !query 39 schema
-struct<ten:int,count(1):bigint,sum(four):bigint>
+struct<ten:int,udf(count(1)):string,CAST(sum(CAST(udf(four) AS DOUBLE)) AS INT):int>
-- !query 39 output
0 100 100
1 100 200
-339,10 +340,10 struct<ten:int,count(1):bigint,sum(four):bigint>
-- !query 40
-select ten, count(four), sum(DISTINCT four) from onek
+select ten, count(udf(four)), udf(sum(DISTINCT four)) from onek
group by ten order by ten
-- !query 40 schema
-struct<ten:int,count(four):bigint,sum(DISTINCT four):bigint>
+struct<ten:int,count(udf(four)):bigint,udf(sum(distinct cast(four as bigint))):string>
-- !query 40 output
0 100 2
1 100 4
-357,11 +358,11 struct<ten:int,count(four):bigint,sum(DISTINCT four):bigint>
-- !query 41
-select ten, sum(distinct four) from onek a
+select ten, udf(sum(distinct four)) from onek a
group by ten
-having exists (select 1 from onek b where sum(distinct a.four) = b.four)
+having exists (select 1 from onek b where udf(sum(distinct a.four)) = b.four)
-- !query 41 schema
-struct<ten:int,sum(DISTINCT four):bigint>
+struct<ten:int,udf(sum(distinct cast(four as bigint))):string>
-- !query 41 output
0 2
2 2
-374,23 +375,23 struct<ten:int,sum(DISTINCT four):bigint>
select ten, sum(distinct four) from onek a
group by ten
having exists (select 1 from onek b
- where sum(distinct a.four + b.four) = b.four)
+ where sum(distinct a.four + b.four) = udf(b.four))
-- !query 42 schema
struct<>
-- !query 42 output
org.apache.spark.sql.AnalysisException
Aggregate/Window/Generate expressions are not valid in where clause of the query.
-Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT)) = CAST(b.`four` AS BIGINT))]
+Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT)) = CAST(udf(four) AS BIGINT))]
Invalid expressions: [sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT))];
-- !query 43
select
- (select max((select i.unique2 from tenk1 i where i.unique1 = o.unique1)))
+ (select udf(max((select i.unique2 from tenk1 i where i.unique1 = o.unique1))))
from tenk1 o
-- !query 43 schema
struct<>
-- !query 43 output
org.apache.spark.sql.AnalysisException
-cannot resolve '`o.unique1`' given input columns: [i.even, i.fivethous, i.four, i.hundred, i.odd, i.string4, i.stringu1, i.stringu2, i.ten, i.tenthous, i.thousand, i.twenty, i.two, i.twothousand, i.unique1, i.unique2]; line 2 pos 63
+cannot resolve '`o.unique1`' given input columns: [i.even, i.fivethous, i.four, i.hundred, i.odd, i.string4, i.stringu1, i.stringu2, i.ten, i.tenthous, i.thousand, i.twenty, i.two, i.twothousand, i.unique1, i.unique2]; line 2 pos 67
```
</p>
</details>
## How was this patch tested?
Manually tested in local.
Also, with JDK 11:
```
Using /.../jdk-11.0.3.jdk/Contents/Home as default JAVA_HOME.
Note, this will be overridden by -java-home if it is set.
[info] Loading project definition from /.../spark/project
[info] Updating {file:/.../spark/project/}spark-build...
...
[info] SQLQueryTestSuite:
...
[info] - udf/pgSQL/udf-aggregates_part1.sql - Scala UDF (17 seconds, 228 milliseconds)
[info] - udf/pgSQL/udf-aggregates_part1.sql - Regular Python UDF (36 seconds, 170 milliseconds)
[info] - udf/pgSQL/udf-aggregates_part1.sql - Scalar Pandas UDF (41 seconds, 132 milliseconds)
...
```
Closes#25110 from HyukjinKwon/SPARK-28270-1.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This PR proposes to replace `REL_12_BETA1` to `REL_12_BETA2` which is latest.
## How was this patch tested?
Manually checked each link and checked via `git grep -r REL_12_BETA1` as well.
Closes#25105 from HyukjinKwon/SPARK-28342.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
The new adaptive execution framework introduced configuration `spark.sql.runtime.reoptimization.enabled`. We now rename it back to `spark.sql.adaptive.enabled` as the umbrella configuration for adaptive execution.
## How was this patch tested?
Existing tests.
Closes#25102 from carsonwang/renameAE.
Authored-by: Carson Wang <carson.wang@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This PR adds some tests converted from `pgSQL/aggregates_part1.sql'` to test UDFs. Please see contribution guide of this umbrella ticket - [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921).
This PR also contains two minor fixes:
1. Change name of Scala UDF from `UDF:name(...)` to `name(...)` to be consistent with Python'
2. Fix Scala UDF at `IntegratedUDFTestUtils.scala ` to handle `null` in strings.
<details><summary>Diff comparing to 'pgSQL/aggregates_part1.sql'</summary>
<p>
```diff
diff --git a/sql/core/src/test/resources/sql-tests/results/pgSQL/aggregates_part1.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out
index 51ca1d55869..124fdd6416e 100644
--- a/sql/core/src/test/resources/sql-tests/results/pgSQL/aggregates_part1.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out
-3,7 +3,7
-- !query 0
-SELECT avg(four) AS avg_1 FROM onek
+SELECT avg(udf(four)) AS avg_1 FROM onek
-- !query 0 schema
struct<avg_1:double>
-- !query 0 output
-11,15 +11,15 struct<avg_1:double>
-- !query 1
-SELECT avg(a) AS avg_32 FROM aggtest WHERE a < 100
+SELECT udf(avg(a)) AS avg_32 FROM aggtest WHERE a < 100
-- !query 1 schema
-struct<avg_32:double>
+struct<avg_32:string>
-- !query 1 output
32.666666666666664
-- !query 2
-select CAST(avg(b) AS Decimal(10,3)) AS avg_107_943 FROM aggtest
+select CAST(avg(udf(b)) AS Decimal(10,3)) AS avg_107_943 FROM aggtest
-- !query 2 schema
struct<avg_107_943:decimal(10,3)>
-- !query 2 output
-27,285 +27,286 struct<avg_107_943:decimal(10,3)>
-- !query 3
-SELECT sum(four) AS sum_1500 FROM onek
+SELECT sum(udf(four)) AS sum_1500 FROM onek
-- !query 3 schema
-struct<sum_1500:bigint>
+struct<sum_1500:double>
-- !query 3 output
-1500
+1500.0
-- !query 4
-SELECT sum(a) AS sum_198 FROM aggtest
+SELECT udf(sum(a)) AS sum_198 FROM aggtest
-- !query 4 schema
-struct<sum_198:bigint>
+struct<sum_198:string>
-- !query 4 output
198
-- !query 5
-SELECT sum(b) AS avg_431_773 FROM aggtest
+SELECT udf(udf(sum(b))) AS avg_431_773 FROM aggtest
-- !query 5 schema
-struct<avg_431_773:double>
+struct<avg_431_773:string>
-- !query 5 output
431.77260909229517
-- !query 6
-SELECT max(four) AS max_3 FROM onek
+SELECT udf(max(four)) AS max_3 FROM onek
-- !query 6 schema
-struct<max_3:int>
+struct<max_3:string>
-- !query 6 output
3
-- !query 7
-SELECT max(a) AS max_100 FROM aggtest
+SELECT max(udf(a)) AS max_100 FROM aggtest
-- !query 7 schema
-struct<max_100:int>
+struct<max_100:string>
-- !query 7 output
-100
+56
-- !query 8
-SELECT max(aggtest.b) AS max_324_78 FROM aggtest
+SELECT CAST(udf(udf(max(aggtest.b))) AS int) AS max_324_78 FROM aggtest
-- !query 8 schema
-struct<max_324_78:float>
+struct<max_324_78:int>
-- !query 8 output
-324.78
+324
-- !query 9
-SELECT stddev_pop(b) FROM aggtest
+SELECT CAST(stddev_pop(udf(b)) AS int) FROM aggtest
-- !query 9 schema
-struct<stddev_pop(CAST(b AS DOUBLE)):double>
+struct<CAST(stddev_pop(CAST(udf(b) AS DOUBLE)) AS INT):int>
-- !query 9 output
-131.10703231895047
+131
-- !query 10
-SELECT stddev_samp(b) FROM aggtest
+SELECT udf(stddev_samp(b)) FROM aggtest
-- !query 10 schema
-struct<stddev_samp(CAST(b AS DOUBLE)):double>
+struct<udf(stddev_samp(cast(b as double))):string>
-- !query 10 output
151.38936080399804
-- !query 11
-SELECT var_pop(b) FROM aggtest
+SELECT CAST(var_pop(udf(b)) as int) FROM aggtest
-- !query 11 schema
-struct<var_pop(CAST(b AS DOUBLE)):double>
+struct<CAST(var_pop(CAST(udf(b) AS DOUBLE)) AS INT):int>
-- !query 11 output
-17189.053923482323
+17189
-- !query 12
-SELECT var_samp(b) FROM aggtest
+SELECT udf(var_samp(b)) FROM aggtest
-- !query 12 schema
-struct<var_samp(CAST(b AS DOUBLE)):double>
+struct<udf(var_samp(cast(b as double))):string>
-- !query 12 output
22918.738564643096
-- !query 13
-SELECT stddev_pop(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT udf(stddev_pop(CAST(b AS Decimal(38,0)))) FROM aggtest
-- !query 13 schema
-struct<stddev_pop(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<udf(stddev_pop(cast(cast(b as decimal(38,0)) as double))):string>
-- !query 13 output
131.18117242958306
-- !query 14
-SELECT stddev_samp(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT stddev_samp(CAST(udf(b) AS Decimal(38,0))) FROM aggtest
-- !query 14 schema
-struct<stddev_samp(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<stddev_samp(CAST(CAST(udf(b) AS DECIMAL(38,0)) AS DOUBLE)):double>
-- !query 14 output
151.47497042966097
-- !query 15
-SELECT var_pop(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT udf(var_pop(CAST(b AS Decimal(38,0)))) FROM aggtest
-- !query 15 schema
-struct<var_pop(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<udf(var_pop(cast(cast(b as decimal(38,0)) as double))):string>
-- !query 15 output
17208.5
-- !query 16
-SELECT var_samp(CAST(b AS Decimal(38,0))) FROM aggtest
+SELECT var_samp(udf(CAST(b AS Decimal(38,0)))) FROM aggtest
-- !query 16 schema
-struct<var_samp(CAST(CAST(b AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<var_samp(CAST(udf(cast(b as decimal(38,0))) AS DOUBLE)):double>
-- !query 16 output
22944.666666666668
-- !query 17
-SELECT var_pop(1.0), var_samp(2.0)
+SELECT udf(var_pop(1.0)), var_samp(udf(2.0))
-- !query 17 schema
-struct<var_pop(CAST(1.0 AS DOUBLE)):double,var_samp(CAST(2.0 AS DOUBLE)):double>
+struct<udf(var_pop(cast(1.0 as double))):string,var_samp(CAST(udf(2.0) AS DOUBLE)):double>
-- !query 17 output
0.0 NaN
-- !query 18
-SELECT stddev_pop(CAST(3.0 AS Decimal(38,0))), stddev_samp(CAST(4.0 AS Decimal(38,0)))
+SELECT stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))), stddev_samp(CAST(udf(4.0) AS Decimal(38,0)))
-- !query 18 schema
-struct<stddev_pop(CAST(CAST(3.0 AS DECIMAL(38,0)) AS DOUBLE)):double,stddev_samp(CAST(CAST(4.0 AS DECIMAL(38,0)) AS DOUBLE)):double>
+struct<stddev_pop(CAST(udf(cast(3.0 as decimal(38,0))) AS DOUBLE)):double,stddev_samp(CAST(CAST(udf(4.0) AS DECIMAL(38,0)) AS DOUBLE)):double>
-- !query 18 output
0.0 NaN
-- !query 19
-select sum(CAST(null AS int)) from range(1,4)
+select sum(udf(CAST(null AS int))) from range(1,4)
-- !query 19 schema
-struct<sum(CAST(NULL AS INT)):bigint>
+struct<sum(CAST(udf(cast(null as int)) AS DOUBLE)):double>
-- !query 19 output
NULL
-- !query 20
-select sum(CAST(null AS long)) from range(1,4)
+select sum(udf(CAST(null AS long))) from range(1,4)
-- !query 20 schema
-struct<sum(CAST(NULL AS BIGINT)):bigint>
+struct<sum(CAST(udf(cast(null as bigint)) AS DOUBLE)):double>
-- !query 20 output
NULL
-- !query 21
-select sum(CAST(null AS Decimal(38,0))) from range(1,4)
+select sum(udf(CAST(null AS Decimal(38,0)))) from range(1,4)
-- !query 21 schema
-struct<sum(CAST(NULL AS DECIMAL(38,0))):decimal(38,0)>
+struct<sum(CAST(udf(cast(null as decimal(38,0))) AS DOUBLE)):double>
-- !query 21 output
NULL
-- !query 22
-select sum(CAST(null AS DOUBLE)) from range(1,4)
+select sum(udf(CAST(null AS DOUBLE))) from range(1,4)
-- !query 22 schema
-struct<sum(CAST(NULL AS DOUBLE)):double>
+struct<sum(CAST(udf(cast(null as double)) AS DOUBLE)):double>
-- !query 22 output
NULL
-- !query 23
-select avg(CAST(null AS int)) from range(1,4)
+select avg(udf(CAST(null AS int))) from range(1,4)
-- !query 23 schema
-struct<avg(CAST(NULL AS INT)):double>
+struct<avg(CAST(udf(cast(null as int)) AS DOUBLE)):double>
-- !query 23 output
NULL
-- !query 24
-select avg(CAST(null AS long)) from range(1,4)
+select avg(udf(CAST(null AS long))) from range(1,4)
-- !query 24 schema
-struct<avg(CAST(NULL AS BIGINT)):double>
+struct<avg(CAST(udf(cast(null as bigint)) AS DOUBLE)):double>
-- !query 24 output
NULL
-- !query 25
-select avg(CAST(null AS Decimal(38,0))) from range(1,4)
+select avg(udf(CAST(null AS Decimal(38,0)))) from range(1,4)
-- !query 25 schema
-struct<avg(CAST(NULL AS DECIMAL(38,0))):decimal(38,4)>
+struct<avg(CAST(udf(cast(null as decimal(38,0))) AS DOUBLE)):double>
-- !query 25 output
NULL
-- !query 26
-select avg(CAST(null AS DOUBLE)) from range(1,4)
+select avg(udf(CAST(null AS DOUBLE))) from range(1,4)
-- !query 26 schema
-struct<avg(CAST(NULL AS DOUBLE)):double>
+struct<avg(CAST(udf(cast(null as double)) AS DOUBLE)):double>
-- !query 26 output
NULL
-- !query 27
-select sum(CAST('NaN' AS DOUBLE)) from range(1,4)
+select sum(CAST(udf('NaN') AS DOUBLE)) from range(1,4)
-- !query 27 schema
-struct<sum(CAST(NaN AS DOUBLE)):double>
+struct<sum(CAST(udf(NaN) AS DOUBLE)):double>
-- !query 27 output
NaN
-- !query 28
-select avg(CAST('NaN' AS DOUBLE)) from range(1,4)
+select avg(CAST(udf('NaN') AS DOUBLE)) from range(1,4)
-- !query 28 schema
-struct<avg(CAST(NaN AS DOUBLE)):double>
+struct<avg(CAST(udf(NaN) AS DOUBLE)):double>
-- !query 28 output
NaN
-- !query 29
SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
-FROM (VALUES (CAST('1' AS DOUBLE)), (CAST('Infinity' AS DOUBLE))) v(x)
+FROM (VALUES (CAST(udf('1') AS DOUBLE)), (CAST(udf('Infinity') AS DOUBLE))) v(x)
-- !query 29 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<>
-- !query 29 output
-Infinity NaN
+org.apache.spark.sql.AnalysisException
+cannot evaluate expression CAST(udf(1) AS DOUBLE) in inline table definition; line 2 pos 14
-- !query 30
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
FROM (VALUES ('Infinity'), ('1')) v(x)
-- !query 30 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS DOUBLE)):double>
-- !query 30 output
Infinity NaN
-- !query 31
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
FROM (VALUES ('Infinity'), ('Infinity')) v(x)
-- !query 31 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS DOUBLE)):double>
-- !query 31 output
Infinity NaN
-- !query 32
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
FROM (VALUES ('-Infinity'), ('Infinity')) v(x)
-- !query 32 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS DOUBLE)):double>
-- !query 32 output
NaN NaN
-- !query 33
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE)))
FROM (VALUES (100000003), (100000004), (100000006), (100000007)) v(x)
-- !query 33 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(udf(cast(x as double)) AS DOUBLE)):double,udf(var_pop(cast(x as double))):string>
-- !query 33 output
1.00000005E8 2.5
-- !query 34
-SELECT avg(CAST(x AS DOUBLE)), var_pop(CAST(x AS DOUBLE))
+SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE)))
FROM (VALUES (7000000000005), (7000000000007)) v(x)
-- !query 34 schema
-struct<avg(CAST(x AS DOUBLE)):double,var_pop(CAST(x AS DOUBLE)):double>
+struct<avg(CAST(udf(cast(x as double)) AS DOUBLE)):double,udf(var_pop(cast(x as double))):string>
-- !query 34 output
7.000000000006E12 1.0
-- !query 35
-SELECT covar_pop(b, a), covar_samp(b, a) FROM aggtest
+SELECT CAST(udf(covar_pop(b, udf(a))) AS int), CAST(covar_samp(udf(b), a) as int) FROM aggtest
-- !query 35 schema
-struct<covar_pop(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double,covar_samp(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double>
+struct<CAST(udf(covar_pop(cast(b as double), cast(udf(a) as double))) AS INT):int,CAST(covar_samp(CAST(udf(b) AS DOUBLE), CAST(a AS DOUBLE)) AS INT):int>
-- !query 35 output
-653.6289553875104 871.5052738500139
+653 871
-- !query 36
-SELECT corr(b, a) FROM aggtest
+SELECT corr(b, udf(a)) FROM aggtest
-- !query 36 schema
-struct<corr(CAST(b AS DOUBLE), CAST(a AS DOUBLE)):double>
+struct<corr(CAST(b AS DOUBLE), CAST(udf(a) AS DOUBLE)):double>
-- !query 36 output
0.1396345165178734
-- !query 37
-SELECT count(four) AS cnt_1000 FROM onek
+SELECT count(udf(four)) AS cnt_1000 FROM onek
-- !query 37 schema
struct<cnt_1000:bigint>
-- !query 37 output
-313,36 +314,36 struct<cnt_1000:bigint>
-- !query 38
-SELECT count(DISTINCT four) AS cnt_4 FROM onek
+SELECT udf(count(DISTINCT four)) AS cnt_4 FROM onek
-- !query 38 schema
-struct<cnt_4:bigint>
+struct<cnt_4:string>
-- !query 38 output
4
-- !query 39
-select ten, count(*), sum(four) from onek
+select ten, udf(count(*)), sum(udf(four)) from onek
group by ten order by ten
-- !query 39 schema
-struct<ten:int,count(1):bigint,sum(four):bigint>
+struct<ten:int,udf(count(1)):string,sum(CAST(udf(four) AS DOUBLE)):double>
-- !query 39 output
-0 100 100
-1 100 200
-2 100 100
-3 100 200
-4 100 100
-5 100 200
-6 100 100
-7 100 200
-8 100 100
-9 100 200
+0 100 100.0
+1 100 200.0
+2 100 100.0
+3 100 200.0
+4 100 100.0
+5 100 200.0
+6 100 100.0
+7 100 200.0
+8 100 100.0
+9 100 200.0
-- !query 40
-select ten, count(four), sum(DISTINCT four) from onek
+select ten, count(udf(four)), udf(sum(DISTINCT four)) from onek
group by ten order by ten
-- !query 40 schema
-struct<ten:int,count(four):bigint,sum(DISTINCT four):bigint>
+struct<ten:int,count(udf(four)):bigint,udf(sum(distinct cast(four as bigint))):string>
-- !query 40 output
0 100 2
1 100 4
-357,11 +358,11 struct<ten:int,count(four):bigint,sum(DISTINCT four):bigint>
-- !query 41
-select ten, sum(distinct four) from onek a
+select ten, udf(sum(distinct four)) from onek a
group by ten
-having exists (select 1 from onek b where sum(distinct a.four) = b.four)
+having exists (select 1 from onek b where udf(sum(distinct a.four)) = b.four)
-- !query 41 schema
-struct<ten:int,sum(DISTINCT four):bigint>
+struct<ten:int,udf(sum(distinct cast(four as bigint))):string>
-- !query 41 output
0 2
2 2
-374,23 +375,23 struct<ten:int,sum(DISTINCT four):bigint>
select ten, sum(distinct four) from onek a
group by ten
having exists (select 1 from onek b
- where sum(distinct a.four + b.four) = b.four)
+ where sum(distinct a.four + b.four) = udf(b.four))
-- !query 42 schema
struct<>
-- !query 42 output
org.apache.spark.sql.AnalysisException
Aggregate/Window/Generate expressions are not valid in where clause of the query.
-Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT)) = CAST(b.`four` AS BIGINT))]
+Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT)) = CAST(udf(four) AS BIGINT))]
Invalid expressions: [sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT))];
-- !query 43
select
- (select max((select i.unique2 from tenk1 i where i.unique1 = o.unique1)))
+ (select udf(max((select i.unique2 from tenk1 i where i.unique1 = o.unique1))))
from tenk1 o
-- !query 43 schema
struct<>
-- !query 43 output
org.apache.spark.sql.AnalysisException
-cannot resolve '`o.unique1`' given input columns: [i.even, i.fivethous, i.four, i.hundred, i.odd, i.string4, i.stringu1, i.stringu2, i.ten, i.tenthous, i.thousand, i.twenty, i.two, i.twothousand, i.unique1, i.unique2]; line 2 pos 63
+cannot resolve '`o.unique1`' given input columns: [i.even, i.fivethous, i.four, i.hundred, i.odd, i.string4, i.stringu1, i.stringu2, i.ten, i.tenthous, i.thousand, i.twenty, i.two, i.twothousand, i.unique1, i.unique2]; line 2 pos 67
```
</p>
</details>
Note that, currently, `IntegratedUDFTestUtils.scala`'s UDFs only return strings. There are some differences between those UDFs (Scala, Pandas and Python):
- Python's string representation of floats can make the tests flaky. (See https://docs.python.org/3/tutorial/floatingpoint.html). To work around this, I had to `CAST(... as int)`.
- There are string representation differences between `Inf` `-Inf` <> `Infinity` `-Infinity` and `nan` <> `NaN`
- Maybe we should add other type versions of UDFs if this makes adding tests difficult.
Note that one issue found - [SPARK-28291](https://issues.apache.org/jira/browse/SPARK-28291). The test was commented for now.
## How was this patch tested?
Tested as guided in [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921).
Closes#25069 from HyukjinKwon/SPARK-28270.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Fix `stringToDate()` for the formats `yyyy` and `yyyy-[m]m` that assumes there are no additional chars after the last components `yyyy` and `[m]m`. In the PR, I propose to check that entire input was consumed for the formats.
After the fix, the input `1999 08 01` will be invalid because it matches to the pattern `yyyy` but the strings contains additional chars ` 08 01`.
Since Spark 1.6.3 ~ 2.4.3, the behavior is the same.
```
spark-sql> SELECT CAST('1999 08 01' AS DATE);
1999-01-01
```
This PR makes it return NULL like Hive.
```
spark-sql> SELECT CAST('1999 08 01' AS DATE);
NULL
```
## How was this patch tested?
Added new checks to `DateTimeUtilsSuite` for the `1999 08 01` and `1999 08` inputs.
Closes#25097 from MaxGekk/spark-28015-invalid-date-format.
Authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
This fixes a problem where it is possible to create a v2 table using the default catalog that cannot be loaded with the session catalog. A session catalog should be used when the v1 catalog is responsible for tables with no catalog in the table identifier.
* Adds a v2 catalog implementation that delegates to the analyzer's SessionCatalog
* Uses the v2 session catalog for CTAS and CreateTable when the provider is a v2 provider and no v2 catalog is in the table identifier
* Updates catalog lookup to always provide the default if it is set for consistent behavior
## How was this patch tested?
* Adds a new test suite for the v2 session catalog that validates the TableCatalog API
* Adds test cases in PlanResolutionSuite to validate the v2 session catalog is used
* Adds test suite for LookupCatalog with a default catalog
Closes#24768 from rdblue/SPARK-27919-add-v2-session-catalog.
Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This PR adds some tests converted from having.sql to test UDFs following the combination guide in [SPARK-27921](url)
<details><summary>Diff comparing to 'having.sql'</summary>
<p>
```diff
diff --git a/sql/core/src/test/resources/sql-tests/results/having.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-having.sql.out
index d87ee52216..7cea2e5128 100644
--- a/sql/core/src/test/resources/sql-tests/results/having.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-having.sql.out
-16,34 +16,34 struct<>
-- !query 1
-SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2
+SELECT udf(k) AS k, udf(sum(v)) FROM hav GROUP BY k HAVING udf(sum(v)) > 2
-- !query 1 schema
-struct<k:string,sum(v):bigint>
+struct<k:string,udf(sum(cast(v as bigint))):string>
-- !query 1 output
one 6
three 3
-- !query 2
-SELECT count(k) FROM hav GROUP BY v + 1 HAVING v + 1 = 2
+SELECT udf(count(udf(k))) FROM hav GROUP BY v + 1 HAVING v + 1 = udf(2)
-- !query 2 schema
-struct<count(k):bigint>
+struct<udf(count(udf(k))):string>
-- !query 2 output
1
-- !query 3
-SELECT MIN(t.v) FROM (SELECT * FROM hav WHERE v > 0) t HAVING(COUNT(1) > 0)
+SELECT udf(MIN(t.v)) FROM (SELECT * FROM hav WHERE v > 0) t HAVING(udf(COUNT(udf(1))) > 0)
-- !query 3 schema
-struct<min(v):int>
+struct<udf(min(v)):string>
-- !query 3 output
1
-- !query 4
-SELECT a + b FROM VALUES (1L, 2), (3L, 4) AS T(a, b) GROUP BY a + b HAVING a + b > 1
+SELECT udf(a + b) FROM VALUES (1L, 2), (3L, 4) AS T(a, b) GROUP BY a + b HAVING a + b > udf(1)
-- !query 4 schema
-struct<(a + CAST(b AS BIGINT)):bigint>
+struct<udf((a + cast(b as bigint))):string>
-- !query 4 output
3
7
```
</p>
</details>
## How was this patch tested?
Tested as guided in SPARK-27921.
Closes#25093 from huaxingao/spark-28281.
Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This PR adds some tests converted from `natural-join.sql` to test UDFs following the combination guide in [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921).
<details><summary>Diff results comparing to `natural-join.sql`</summary>
<p>
```diff
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.
sql.out
index 43f2f9a..53ef177 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out
-27,7 +27,7 struct<>
-- !query 2
-SELECT * FROM nt1 natural join nt2 where k = "one"
+SELECT * FROM nt1 natural join nt2 where udf(k) = "one"
-- !query 2 schema
struct<k:string,v1:int,v2:int>
-- !query 2 output
-36,7 +36,7 one 1 5
-- !query 3
-SELECT * FROM nt1 natural left join nt2 order by v1, v2
+SELECT * FROM nt1 natural left join nt2 where k <> udf("") order by v1, v2
-- !query 3 schema
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.
sql.out
index 43f2f9a..53ef177 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out
-27,7 +27,7 struct<>
-- !query 2
-SELECT * FROM nt1 natural join nt2 where k = "one"
+SELECT * FROM nt1 natural join nt2 where udf(k) = "one"
-- !query 2 schema
struct<k:string,v1:int,v2:int>
-- !query 2 output
-36,7 +36,7 one 1 5
-- !query 3
-SELECT * FROM nt1 natural left join nt2 order by v1, v2
+SELECT * FROM nt1 natural left join nt2 where k <> udf("") order by v1, v2
-- !query 3 schema
struct<k:string,v1:int,v2:int>
```
</p>
</details>
## How was this patch tested?
Tested as guided in [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921).
Closes#25088 from manuzhang/SPARK-27922.
Authored-by: manu.zhang <manu.zhang@vipshop.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
There is a bug in `ExtractPythonUDFs` that produces wrong result attributes. It causes a failure when using `PythonUDF`s among multiple child plans, e.g., join. An example is using `PythonUDF`s in join condition.
```python
>>> left = spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)])
>>> right = spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)])
>>> f = udf(lambda a: a, IntegerType())
>>> df = left.join(right, [f("a") == f("b"), left.a1 == right.b1])
>>> df.collect()
19/07/10 12:20:49 ERROR Executor: Exception in task 5.0 in stage 0.0 (TID 5)
java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:201)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt(rows.scala:36)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt$(rows.scala:36)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.isNullAt(rows.scala:195)
at org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:70)
...
```
## How was this patch tested?
Added test.
Closes#25091 from viirya/SPARK-28323.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
## What changes were proposed in this pull request?
This is a followup of the discussion in https://github.com/apache/spark/pull/24675#discussion_r286786053
`QueryPlan#references` is an important property. The `ColumnPrunning` rule relies on it.
Some query plan nodes have `Seq[Attribute]` parameter, which is used as its output attributes. For example, leaf nodes, `Generate`, `MapPartitionsInPandas`, etc. These nodes override `producedAttributes` to make `missingInputs` correct.
However, these nodes also need to override `references` to make column pruning work. This PR proposes to exclude `producedAttributes` from the default implementation of `QueryPlan#references`, so that we don't need to override `references` in all these nodes.
Note that, technically we can remove `producedAttributes` and always ask query plan nodes to override `references`. But I do find the code can be simpler with `producedAttributes` in some places, where there is a base class for some specific query plan nodes.
## How was this patch tested?
existing tests
Closes#25052 from cloud-fan/minor.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This PR adds some tests converted from `pgSQL/case.sql'` to test UDFs. Please see contribution guide of this umbrella ticket - [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921).
This PR also contains two minor fixes:
1. Change name of Scala UDF from `UDF:name(...)` to `name(...)` to be consistent with Python'
2. Fix Scala UDF at `IntegratedUDFTestUtils.scala ` to handle `null` in strings.
<details><summary>Diff comparing to 'pgSQL/case.sql'</summary>
<p>
```diff
diff --git a/sql/core/src/test/resources/sql-tests/results/pgSQL/case.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-case.sql.out
index fa078d16d6d..55bef64338f 100644
--- a/sql/core/src/test/resources/sql-tests/results/pgSQL/case.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-case.sql.out
-115,7 +115,7 struct<>
-- !query 13
SELECT '3' AS `One`,
CASE
- WHEN 1 < 2 THEN 3
+ WHEN CAST(udf(1 < 2) AS boolean) THEN 3
END AS `Simple WHEN`
-- !query 13 schema
struct<One:string,Simple WHEN:int>
-126,10 +126,10 struct<One:string,Simple WHEN:int>
-- !query 14
SELECT '<NULL>' AS `One`,
CASE
- WHEN 1 > 2 THEN 3
+ WHEN 1 > 2 THEN udf(3)
END AS `Simple default`
-- !query 14 schema
-struct<One:string,Simple default:int>
+struct<One:string,Simple default:string>
-- !query 14 output
<NULL> NULL
-137,17 +137,17 struct<One:string,Simple default:int>
-- !query 15
SELECT '3' AS `One`,
CASE
- WHEN 1 < 2 THEN 3
- ELSE 4
+ WHEN udf(1) < 2 THEN udf(3)
+ ELSE udf(4)
END AS `Simple ELSE`
-- !query 15 schema
-struct<One:string,Simple ELSE:int>
+struct<One:string,Simple ELSE:string>
-- !query 15 output
3 3
-- !query 16
-SELECT '4' AS `One`,
+SELECT udf('4') AS `One`,
CASE
WHEN 1 > 2 THEN 3
ELSE 4
-159,10 +159,10 struct<One:string,ELSE default:int>
-- !query 17
-SELECT '6' AS `One`,
+SELECT udf('6') AS `One`,
CASE
- WHEN 1 > 2 THEN 3
- WHEN 4 < 5 THEN 6
+ WHEN CAST(udf(1 > 2) AS boolean) THEN 3
+ WHEN udf(4) < 5 THEN 6
ELSE 7
END AS `Two WHEN with default`
-- !query 17 schema
-173,7 +173,7 struct<One:string,Two WHEN with default:int>
-- !query 18
SELECT '7' AS `None`,
- CASE WHEN rand() < 0 THEN 1
+ CASE WHEN rand() < udf(0) THEN 1
END AS `NULL on no matches`
-- !query 18 schema
struct<None:string,NULL on no matches:int>
-182,36 +182,36 struct<None:string,NULL on no matches:int>
-- !query 19
-SELECT CASE WHEN 1=0 THEN 1/0 WHEN 1=1 THEN 1 ELSE 2/0 END
+SELECT CASE WHEN CAST(udf(1=0) AS boolean) THEN 1/0 WHEN 1=1 THEN 1 ELSE 2/0 END
-- !query 19 schema
-struct<CASE WHEN (1 = 0) THEN (CAST(1 AS DOUBLE) / CAST(0 AS DOUBLE)) WHEN (1 = 1) THEN CAST(1 AS DOUBLE) ELSE (CAST(2 AS DOUBLE) / CAST(0 AS DOUBLE)) END:double>
+struct<CASE WHEN CAST(udf((1 = 0)) AS BOOLEAN) THEN (CAST(1 AS DOUBLE) / CAST(0 AS DOUBLE)) WHEN (1 = 1) THEN CAST(1 AS DOUBLE) ELSE (CAST(2 AS DOUBLE) / CAST(0 AS DOUBLE)) END:double>
-- !query 19 output
1.0
-- !query 20
-SELECT CASE 1 WHEN 0 THEN 1/0 WHEN 1 THEN 1 ELSE 2/0 END
+SELECT CASE 1 WHEN 0 THEN 1/udf(0) WHEN 1 THEN 1 ELSE 2/0 END
-- !query 20 schema
-struct<CASE WHEN (1 = 0) THEN (CAST(1 AS DOUBLE) / CAST(0 AS DOUBLE)) WHEN (1 = 1) THEN CAST(1 AS DOUBLE) ELSE (CAST(2 AS DOUBLE) / CAST(0 AS DOUBLE)) END:double>
+struct<CASE WHEN (1 = 0) THEN (CAST(1 AS DOUBLE) / CAST(CAST(udf(0) AS DOUBLE) AS DOUBLE)) WHEN (1 = 1) THEN CAST(1 AS DOUBLE) ELSE (CAST(2 AS DOUBLE) / CAST(0 AS DOUBLE)) END:double>
-- !query 20 output
1.0
-- !query 21
-SELECT CASE WHEN i > 100 THEN 1/0 ELSE 0 END FROM case_tbl
+SELECT CASE WHEN i > 100 THEN udf(1/0) ELSE udf(0) END FROM case_tbl
-- !query 21 schema
-struct<CASE WHEN (i > 100) THEN (CAST(1 AS DOUBLE) / CAST(0 AS DOUBLE)) ELSE CAST(0 AS DOUBLE) END:double>
+struct<CASE WHEN (i > 100) THEN udf((cast(1 as double) / cast(0 as double))) ELSE udf(0) END:string>
-- !query 21 output
-0.0
-0.0
-0.0
-0.0
+0
+0
+0
+0
-- !query 22
-SELECT CASE 'a' WHEN 'a' THEN 1 ELSE 2 END
+SELECT CASE 'a' WHEN 'a' THEN udf(1) ELSE udf(2) END
-- !query 22 schema
-struct<CASE WHEN (a = a) THEN 1 ELSE 2 END:int>
+struct<CASE WHEN (a = a) THEN udf(1) ELSE udf(2) END:string>
-- !query 22 output
1
-283,7 +283,7 big
-- !query 27
-SELECT * FROM CASE_TBL WHERE COALESCE(f,i) = 4
+SELECT * FROM CASE_TBL WHERE udf(COALESCE(f,i)) = 4
-- !query 27 schema
struct<i:int,f:double>
-- !query 27 output
-291,7 +291,7 struct<i:int,f:double>
-- !query 28
-SELECT * FROM CASE_TBL WHERE NULLIF(f,i) = 2
+SELECT * FROM CASE_TBL WHERE udf(NULLIF(f,i)) = 2
-- !query 28 schema
struct<i:int,f:double>
-- !query 28 output
-299,10 +299,10 struct<i:int,f:double>
-- !query 29
-SELECT COALESCE(a.f, b.i, b.j)
+SELECT udf(COALESCE(a.f, b.i, b.j))
FROM CASE_TBL a, CASE2_TBL b
-- !query 29 schema
-struct<coalesce(f, CAST(i AS DOUBLE), CAST(j AS DOUBLE)):double>
+struct<udf(coalesce(f, cast(i as double), cast(j as double))):string>
-- !query 29 output
-30.3
-30.3
-332,8 +332,8 struct<coalesce(f, CAST(i AS DOUBLE), CAST(j AS DOUBLE)):double>
-- !query 30
SELECT *
- FROM CASE_TBL a, CASE2_TBL b
- WHERE COALESCE(a.f, b.i, b.j) = 2
+ FROM CASE_TBL a, CASE2_TBL b
+ WHERE udf(COALESCE(a.f, b.i, b.j)) = 2
-- !query 30 schema
struct<i:int,f:double,i:int,j:int>
-- !query 30 output
-342,7 +342,7 struct<i:int,f:double,i:int,j:int>
-- !query 31
-SELECT '' AS Five, NULLIF(a.i,b.i) AS `NULLIF(a.i,b.i)`,
+SELECT udf('') AS Five, NULLIF(a.i,b.i) AS `NULLIF(a.i,b.i)`,
NULLIF(b.i, 4) AS `NULLIF(b.i,4)`
FROM CASE_TBL a, CASE2_TBL b
-- !query 31 schema
-377,7 +377,7 struct<Five:string,NULLIF(a.i,b.i):int,NULLIF(b.i,4):int>
-- !query 32
SELECT '' AS `Two`, *
FROM CASE_TBL a, CASE2_TBL b
- WHERE COALESCE(f,b.i) = 2
+ WHERE CAST(udf(COALESCE(f,b.i) = 2) AS boolean)
-- !query 32 schema
struct<Two:string,i:int,f:double,i:int,j:int>
-- !query 32 output
-388,15 +388,15 struct<Two:string,i:int,f:double,i:int,j:int>
-- !query 33
SELECT CASE
(CASE vol('bar')
- WHEN 'foo' THEN 'it was foo!'
- WHEN vol(null) THEN 'null input'
+ WHEN udf('foo') THEN 'it was foo!'
+ WHEN udf(vol(null)) THEN 'null input'
WHEN 'bar' THEN 'it was bar!' END
)
- WHEN 'it was foo!' THEN 'foo recognized'
- WHEN 'it was bar!' THEN 'bar recognized'
- ELSE 'unrecognized' END
+ WHEN udf('it was foo!') THEN 'foo recognized'
+ WHEN 'it was bar!' THEN udf('bar recognized')
+ ELSE 'unrecognized' END AS col
-- !query 33 schema
-struct<CASE WHEN (CASE WHEN (UDF:vol(bar) = foo) THEN it was foo! WHEN (UDF:vol(bar) = UDF:vol(null)) THEN null input WHEN (UDF:vol(bar) = bar) THEN it was bar! END = it was foo!) THEN foo recognized WHEN (CASE WHEN (UDF:vol(bar) = foo) THEN it was foo! WHEN (UDF:vol(bar) = UDF:vol(null)) THEN null input WHEN (UDF:vol(bar) = bar) THEN it was bar! END = it was bar!) THEN bar recognized ELSE unrecognized END:string>
+struct<col:string>
-- !query 33 output
bar recognized
```
</p>
</details>
https://github.com/apache/spark/pull/25069 contains the same minor fixes as it's required to write the tests.
## How was this patch tested?
Tested as guided in [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921).
Closes#25070 from HyukjinKwon/SPARK-28273.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
In Dataset drop(col: Column) method, the `equals` comparison method was used instead of `semanticEquals`, which caused the problem of abnormal case-sensitivity behavior. When attributes of LogicalPlan are checked for equality, `semanticEquals` should be used instead.
A similar PR I referred to: https://github.com/apache/spark/pull/22713 created by mgaido91
## How was this patch tested?
- Added new unit test case in DataFrameSuite
- ./build/sbt "testOnly org.apache.spark.sql.*"
- The python code from ticket reporter at https://issues.apache.org/jira/browse/SPARK-28189Closes#25055 from Tonix517/SPARK-28189.
Authored-by: Tony Zhang <tony.zhang@uber.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
This PR adds some more WITH test cases as a follow-up to https://github.com/apache/spark/pull/24842
## How was this patch tested?
Add new UTs.
Closes#24949 from peter-toth/SPARK-28002-follow-up.
Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
This PR proposes to rename `mapPartitionsInPandas` to `mapInPandas` with a separate evaluation type .
Had an offline discussion with rxin, mengxr and cloud-fan
The reason is basically:
1. `SCALAR_ITER` doesn't make sense with `mapPartitionsInPandas`.
2. It cannot share the same Pandas UDF, for instance, at `select` and `mapPartitionsInPandas` unlike `GROUPED_AGG` because iterator's return type is different.
3. `mapPartitionsInPandas` -> `mapInPandas` - see https://github.com/apache/spark/pull/25044#issuecomment-508298552 and https://github.com/apache/spark/pull/25044#issuecomment-508299764
Renaming `SCALAR_ITER` as `MAP_ITER` is abandoned due to 2. reason.
For `XXX_ITER`, it might have to have a different interface in the future if we happen to add other versions of them. But this is an orthogonal topic with `mapPartitionsInPandas`.
## How was this patch tested?
Existing tests should cover.
Closes#25044 from HyukjinKwon/SPARK-28198.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This PR adds support of `WITH` clause within a subquery so this query becomes valid:
```
SELECT max(c) FROM (
WITH t AS (SELECT 1 AS c)
SELECT * FROM t
)
```
## How was this patch tested?
Added new UTs.
Closes#24831 from peter-toth/SPARK-19799-2.
Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
This is to implement a ReduceNumShufflePartitions rule in the new adaptive execution framework introduced in #24706. This rule is used to adjust the post shuffle partitions based on the map output statistics.
## How was this patch tested?
Added ReduceNumShufflePartitionsSuite
Closes#24978 from carsonwang/reduceNumShufflePartitions.
Authored-by: Carson Wang <carson.wang@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This pr add calculate local directory size to `SQLTestUtils`.
We can avoid these changes after this pr:
![image](https://user-images.githubusercontent.com/5399861/60386910-66ca8680-9ace-11e9-8d52-e1eea38e324a.png)
## How was this patch tested?
Existing test
Closes#25014 from wangyum/SPARK-28216.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
This PR makes the predicate pushdown logic in catalyst optimizer more efficient by unifying two existing rules `PushdownPredicates` and `PushPredicateThroughJoin`. Previously pushing down a predicate for queries such as `Filter(Join(Join(Join)))` requires n steps. This patch essentially reduces this to a single pass.
To make this actually work, we need to unify a few rules such as `CombineFilters`, `PushDownPredicate` and `PushDownPrdicateThroughJoin`. Otherwise cases such as `Filter(Join(Filter(Join)))` still requires several passes to fully push down predicates. This unification is done by composing several partial functions, which makes a minimal code change and can reuse existing UTs.
Results show that this optimization can improve the catalyst optimization time by 16.5%. For queries with more joins, the performance is even better. E.g., for TPC-DS q64, the performance boost is 49.2%.
## How was this patch tested?
Existing UTs + new a UT for the new rule.
Closes#24956 from yeshengm/fixed-point-opt.
Authored-by: Yesheng Ma <kimi.ysma@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
In some cases, executeTake in SparkPlan could decode more than necessary.
For example, in case of below odd/even number partitioning, total row's count from partitions will be 100, although it is limited with 51. And 'executeTake' in SparkPlan decodes all of them, "49" rows of which are unnecessarily decoded.
```scala
spark.sparkContext.parallelize((0 until 100).map(i => (i, 1))).toDF()
.repartitionByRange(2, $"_1" % 2).limit(51).collect()
```
By using a iterator of the scalar collection, we can make ensure that at most n rows are decoded.
## How was this patch tested?
Existing unit tests that call limit function of DataFrame.
testOnly *SQLQuerySuite
testOnly *DataFrameSuite
Closes#22347 from Dooyoung-Hwang/refactor_execute_take.
Authored-by: Dooyoung Hwang <dooyoung.hwang@sk.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>