### What changes were proposed in this pull request?
Add database if exists check in `SeesionCatalog`
### Why are the changes needed?
Curently execute `drop database test` will throw unfriendly error msg.
```
Error in query: org.apache.hadoop.hive.metastore.api.NoSuchObjectException: test
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.metastore.api.NoSuchObjectException: test
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:112)
at org.apache.spark.sql.hive.HiveExternalCatalog.dropDatabase(HiveExternalCatalog.scala:200)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.dropDatabase(ExternalCatalogWithListener.scala:53)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.dropDatabase(SessionCatalog.scala:273)
at org.apache.spark.sql.execution.command.DropDatabaseCommand.run(ddl.scala:111)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3707)
```
### Does this PR introduce _any_ user-facing change?
Yes, more cleaner error msg.
### How was this patch tested?
Add test.
Closes#32768 from ulysses-you/SPARK-35629.
Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
Sets `references` for `NamedLambdaVariable` and `LambdaFunction`.
| Expression | NamedLambdaVariable | LambdaFunction |
| --- | --- | --- |
| References before | None | All function references |
| References after | self.toAttribute | Function references minus arguments' references |
In `NestedColumnAliasing`, this means that `ExtractValue(ExtractValue(attr, lv: NamedLambdaVariable), ...)` now references both `attr` and `lv`, rather than just `attr`. As a result, it will not be included in the nested column references.
### Why are the changes needed?
Before, lambda key was referenced outside of lambda function.
#### Example 1
Before:
```
Project [transform(keys#0, lambdafunction(_extract_v1#0, lambda key#0, false)) AS a#0]
+- 'Join Cross
:- Project [kvs#0[lambda key#0].v1 AS _extract_v1#0]
: +- LocalRelation <empty>, [kvs#0]
+- LocalRelation <empty>, [keys#0]
```
After:
```
Project [transform(keys#418, lambdafunction(kvs#417[lambda key#420].v1, lambda key#420, false)) AS a#419]
+- Join Cross
:- LocalRelation <empty>, [kvs#417]
+- LocalRelation <empty>, [keys#418]
```
#### Example 2
Before:
```
Project [transform(keys#0, lambdafunction(kvs#0[lambda key#0].v1, lambda key#0, false)) AS a#0]
+- GlobalLimit 5
+- LocalLimit 5
+- Project [keys#0, _extract_v1#0 AS _extract_v1#0]
+- GlobalLimit 5
+- LocalLimit 5
+- Project [kvs#0[lambda key#0].v1 AS _extract_v1#0, keys#0]
+- LocalRelation <empty>, [kvs#0, keys#0]
```
After:
```
Project [transform(keys#428, lambdafunction(kvs#427[lambda key#430].v1, lambda key#430, false)) AS a#429]
+- GlobalLimit 5
+- LocalLimit 5
+- Project [keys#428, kvs#427]
+- GlobalLimit 5
+- LocalLimit 5
+- LocalRelation <empty>, [kvs#427, keys#428]
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Scala unit tests for the examples above
Closes#32773 from karenfeng/SPARK-35636.
Authored-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This pr add in/inset predicate support for `UnwrapCastInBinaryComparison`.
Current implement doesn't pushdown filters for `In/InSet` which contains `Cast`.
For instance:
```scala
spark.range(50).selectExpr("cast(id as int) as id").write.mode("overwrite").parquet("/tmp/parquet/t1")
spark.read.parquet("/tmp/parquet/t1").where("id in (1L, 2L, 4L)").explain
```
before this pr:
```
== Physical Plan ==
*(1) Filter cast(id#5 as bigint) IN (1,2,4)
+- *(1) ColumnarToRow
+- FileScan parquet [id#5] Batched: true, DataFilters: [cast(id#5 as bigint) IN (1,2,4)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/parquet/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
```
after this pr:
```
== Physical Plan ==
*(1) Filter id#95 IN (1,2,4)
+- *(1) ColumnarToRow
+- FileScan parquet [id#95] Batched: true, DataFilters: [id#95 IN (1,2,4)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/parquet/t1], PartitionFilters: [], PushedFilters: [In(id, [1,2,4])], ReadSchema: struct<id:int>
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New test.
Closes#32488 from cfmcgrady/SPARK-35316.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR removes `canPlanAsBroadcastHashJoin` check in `EliminateOuterJoin.
### Why are the changes needed?
We can always removes outer join if it only has DISTINCT on streamed side.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
Closes#32744 from wangyum/SPARK-34808-2.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR group exception messages in `sql/hive/src/main/scala/org/apache/spark/sql/hive/execution`.
### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.
### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.
### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.
Closes#32694 from beliefer/SPARK-35059.
Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Currently, we do not have a suitable definition of the `user` concept in Spark. We only have a `sparkUser` app widely but do not support identify or retrieve the user information from a session in STS or a runtime query execution.
`current_user()` is very popular and supported by plenty of other modern or old school databases, and also ANSI compliant.
This PR add `current_user()` as a SQL function. And, they are the same. In this PR, we add these functions w/o ambiguity.
1. For a normal single-threaded Spark application, clearly the `sparkUser` is always equivalent to `current_user()` .
2. For a multi-threaded Spark application, e.g. Spark thrift server, we use a `ThreadLocal` variable to store the client-side user(after authenticated) before running the query and retrieve it in the parser.
### Why are the changes needed?
`current_user()` is very popular and supported by plenty of other modern or old school databases, and also ANSI compliant.
### Does this PR introduce _any_ user-facing change?
yes, added `current_user()` as a SQL function
### How was this patch tested?
new tests in thrift server and sql/catalyst
Closes#32718 from yaooqinn/SPARK-21957.
Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Add rule `ConvertToLocalRelation` into AQE Optimizer.
### Why are the changes needed?
Support propagate empty local relation through project and filter like such SQL case:
```
Aggregate
Project
Join
ShuffleStage
ShuffleStage
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add test.
Closes#32724 from ulysses-you/SPARK-35585.
Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
The condition check for FULL OUTER sort merge join (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L1368 ) has unnecessary trip when `leftIndex == leftMatches.size` or `rightIndex == rightMatches.size`. Though this does not affect correctness (`scanNextInBuffered()` returns false anyway). But we can avoid it in the first place.
### Why are the changes needed?
Better readability for developers and avoid unnecessary execution.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing unit tests, such as `OuterJoinSuite.scala`.
Closes#32736 from c21/join-bug.
Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Gengliang Wang <ltnwgl@gmail.com>
### What changes were proposed in this pull request?
A test case of AdaptiveQueryExecSuite becomes flaky since there are too many debug logs in RootLogger:
https://github.com/Yikun/spark/runs/2715222392?check_suite_focus=truehttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139125/testReport/
To fix it, I suggest supporting multiple loggers in the testing method withLogAppender. So that the LogAppender gets clean target log outputs.
### Why are the changes needed?
Fix a flaky test case.
Also, reduce unnecessary memory cost in tests.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test
Closes#32725 from gengliangwang/fixFlakyLogAppender.
Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
Move `Set` command related test cases from `SQLQuerySuite` to a new test suite `SetCommandSuite`. There are 7 test cases in total.
### Why are the changes needed?
Code refactoring. `SQLQuerySuite` is becoming big.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests
Closes#32732 from gengliangwang/setsuite.
Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
In the PR, I propose to support special datetime values introduced by #25708 and by #25716 only in typed literals, and don't recognize them in parsing strings to dates/timestamps. The following string values are supported only in typed timestamp literals:
- `epoch [zoneId]` - `1970-01-01 00:00:00+00 (Unix system time zero)`
- `today [zoneId]` - midnight today.
- `yesterday [zoneId]` - midnight yesterday
- `tomorrow [zoneId]` - midnight tomorrow
- `now` - current query start time.
For example:
```sql
spark-sql> SELECT timestamp 'tomorrow';
2019-09-07 00:00:00
```
Similarly, the following special date values are supported only in typed date literals:
- `epoch [zoneId]` - `1970-01-01`
- `today [zoneId]` - the current date in the time zone specified by `spark.sql.session.timeZone`.
- `yesterday [zoneId]` - the current date -1
- `tomorrow [zoneId]` - the current date + 1
- `now` - the date of running the current query. It has the same notion as `today`.
For example:
```sql
spark-sql> SELECT date 'tomorrow' - date 'yesterday';
2
```
### Why are the changes needed?
In the current implementation, Spark supports the special date/timestamp value in any input strings casted to dates/timestamps that leads to the following problems:
- If executors have different system time, the result is inconsistent, and random. Column values depend on where the conversions were performed.
- The special values play the role of distributed non-deterministic functions though users might think of the values as constants.
### Does this PR introduce _any_ user-facing change?
Yes but the probability should be small.
### How was this patch tested?
By running existing test suites:
```
$ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite -- -z interval.sql"
$ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite -- -z date.sql"
$ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite -- -z timestamp.sql"
$ build/sbt "test:testOnly *DateTimeUtilsSuite"
```
Closes#32714 from MaxGekk/remove-datetime-special-values.
Lead-authored-by: Max Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
This PR adds a unit test to show a bug in the latest janino version which fails to compile valid Java code. Unfortunately, I can't share the exact query that can trigger this bug (includes some custom expressions), but this pattern is not very uncommon and I believe can be triggered by some real queries.
A follow-up is needed before the 3.2 release, to either fix this bug in janino, or revert the janino version upgrade, or work around it in Spark.
### Why are the changes needed?
make it easy for people to debug janino, as I'm not a janino expert.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
N/A
Closes#32716 from cloud-fan/janino.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Currently, the results of following SQL queries are not redacted:
```
SET [KEY];
SET;
```
For example:
```
scala> spark.sql("set javax.jdo.option.ConnectionPassword=123456").show()
+--------------------+------+
| key| value|
+--------------------+------+
|javax.jdo.option....|123456|
+--------------------+------+
scala> spark.sql("set javax.jdo.option.ConnectionPassword").show()
+--------------------+------+
| key| value|
+--------------------+------+
|javax.jdo.option....|123456|
+--------------------+------+
scala> spark.sql("set").show()
+--------------------+--------------------+
| key| value|
+--------------------+--------------------+
|javax.jdo.option....| 123456|
```
We should hide the sensitive information and redact the query output.
### Why are the changes needed?
Security.
### Does this PR introduce _any_ user-facing change?
Yes, the sensitive information in the output of Set commands are redacted
### How was this patch tested?
Unit test
Closes#32712 from gengliangwang/redactSet.
Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Handle Currying Product while serializing TreeNode to JSON. While processing [Product](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L820), we may get an assert error for cases like Currying Product because of the mismatch of sizes between field name and field values.
Fallback to use reflection to get all the values for constructor parameters when we meet such cases.
### Why are the changes needed?
Avoid throwing error while serializing TreeNode to JSON, try to output as much information as possible.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New UT case added.
Closes#32713 from ivoson/SPARK-35411-followup.
Authored-by: Tengfei Huang <tengfei.h@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This pr add new rule to removes outer join if it only has distinct on streamed side. For example:
```scala
spark.range(200L).selectExpr("id AS a").createTempView("t1")
spark.range(300L).selectExpr("id AS b").createTempView("t2")
spark.sql("SELECT DISTINCT a FROM t1 LEFT JOIN t2 ON a = b").explain(true)
```
Before this pr:
```
== Optimized Logical Plan ==
Aggregate [a#2L], [a#2L]
+- Project [a#2L]
+- Join LeftOuter, (a#2L = b#6L)
:- Project [id#0L AS a#2L]
: +- Range (0, 200, step=1, splits=Some(2))
+- Project [id#4L AS b#6L]
+- Range (0, 300, step=1, splits=Some(2))
```
After this pr:
```
== Optimized Logical Plan ==
Aggregate [a#2L], [a#2L]
+- Project [id#0L AS a#2L]
+- Range (0, 200, step=1, splits=Some(2))
```
### Why are the changes needed?
Improve query performance. [DB2](https://www.ibm.com/docs/en/db2-for-zos/11?topic=manipulation-how-db2-simplifies-join-operations) support this feature:
![image](https://user-images.githubusercontent.com/5399861/119594277-0d7c4680-be0e-11eb-8bd4-366d8c4639f0.png)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
Closes#31908 from wangyum/SPARK-34808.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request?
This is a minor change to update how `StateStoreRestoreExec` computes its number of output rows. Previously we only count input rows, but the optionally restored rows are not counted in.
### Why are the changes needed?
Currently the number of output rows of `StateStoreRestoreExec` only counts the each input row. But it actually outputs input rows + optional restored rows. We should provide correct number of output rows.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
Closes#32703 from viirya/fix-outputrows.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR refactors `SubqueryExpression` class. It removes the children field from SubqueryExpression's constructor and adds `outerAttrs` and `joinCond`.
### Why are the changes needed?
Currently, the children field of a subquery expression is used to store both collected outer references in the subquery plan and join conditions after correlated predicates are pulled up.
For example:
`SELECT (SELECT max(c1) FROM t1 WHERE t1.c1 = t2.c1) FROM t2`
During the analysis phase, outer references in the subquery are stored in the children field: `scalar-subquery [t2.c1]`, but after the optimizer rule `PullupCorrelatedPredicates`, the children field will be used to store the join conditions, which contain both the inner and the outer references: `scalar-subquery [t1.c1 = t2.c1]`. This is why the references of SubqueryExpression excludes the inner plan's output:
29ed1a2de4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala (L68-L69)
This can be confusing and error-prone. The references for a subquery expression should always be defined as outer attribute references.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#32687 from allisonwang-db/refactor-subquery-expr.
Authored-by: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
After SPARK-29291 and SPARK-33352, there are still some compilation warnings about `procedure syntax is deprecated` as follows:
```
[WARNING] [Warn] /spark/core/src/main/scala/org/apache/spark/MapOutputTracker.scala:723: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `registerMergeResult`'s return type
[WARNING] [Warn] /spark/core/src/main/scala/org/apache/spark/MapOutputTracker.scala:748: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `unregisterMergeResult`'s return type
[WARNING] [Warn] /spark/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala:223: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `testSimpleSpillingForAllCodecs`'s return type
[WARNING] [Warn] /spark/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASBenchmark.scala:53: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `runBLASBenchmark`'s return type
[WARNING] [Warn] /spark/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala:110: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `assertEmptyRootPath`'s return type
[WARNING] [Warn] /spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala:602: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `executeCTASWithNonEmptyLocation`'s return type
```
So the main change of this pr is cleanup these compilation warnings.
### Why are the changes needed?
Eliminate compilation warnings in Scala 2.13 and this change should be compatible with Scala 2.12
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass the Jenkins or GitHub Action
Closes#32669 from LuciferYang/re-clean-procedure-syntax.
Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Added the following TreePattern enums:
- EXCHANGE
- IN_SUBQUERY_EXEC
- UPDATE_FIELDS
Migrated `transformAllExpressions` call sites to use `transformAllExpressionsWithPruning`
### Why are the changes needed?
Reduce the number of tree traversals and hence improve the query compilation latency.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Perf diff:
Rule name | Total Time (baseline) | Total Time (experiment) | experiment/baseline
OptimizeUpdateFields | 54646396 | 27444424 | 0.5
ReplaceUpdateFieldsExpression | 24694303 | 2087517 | 0.08
Closes#32643 from sigmod/all_expressions.
Authored-by: Yingyi Bu <yingyi.bu@databricks.com>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
### What changes were proposed in this pull request?
I just noticed that `AdaptiveQueryExecSuite.SPARK-34091: Batch shuffle fetch in AQE partition coalescing` takes more than 10 minutes to finish, which is unacceptable.
This PR sets the shuffle partitions to 10 in that test, so that the test can finish with 5 seconds.
### Why are the changes needed?
speed up the test
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
N/A
Closes#32695 from cloud-fan/test.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR fixes a build error with Scala 2.13 on GA.
#32301 seems to bring this error.
### Why are the changes needed?
To recover CI.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA
Closes#32696 from sarutak/followup-SPARK-35194.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
### What changes were proposed in this pull request?
Refactors `NestedColumnAliasing` and `GeneratorNestedColumnAliasing` for readability.
### Why are the changes needed?
Improves readability for future maintenance.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#32301 from karenfeng/refactor-nested-column-aliasing.
Authored-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Add a new method `isMaterialized` in `QueryStageExec`.
### Why are the changes needed?
Currently, we use `resultOption().get.isDefined` to check if a query stage has materialized. The code is not readable at a glance. It's better to use a new method like `isMaterialized` to define it.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass CI.
Closes#32689 from ulysses-you/SPARK-35552.
Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
Various small code simplification/cleanup for OptimizeSkewedJoin
### Why are the changes needed?
code refactor
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
Closes#32685 from cloud-fan/skew-join.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
### What changes were proposed in this pull request?
Initial implementation of RocksDBCheckpointMetadata. It persists the metadata for RocksDBFileManager.
### Why are the changes needed?
The RocksDBCheckpointMetadata persists the metadata for each committed batch in JSON format. The object contains all RocksDB file names and the number of total keys.
The metadata binds closely with the directory structure of RocksDBFileManager, as described in the design doc - [Directory Structure and Format for Files stored in DFS](https://docs.google.com/document/d/10wVGaUorgPt4iVe4phunAcjU924fa3-_Kf29-2nxH6Y/edit#heading=h.zgvw85ijoz2).
### Does this PR introduce _any_ user-facing change?
No. Internal implementation only.
### How was this patch tested?
New UT added.
Closes#32272 from xuanyuanking/SPARK-35172.
Lead-authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Co-authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
Spark conv function is from MySQL and it's better to follow the MySQL behavior. MySQL returns the max unsigned long if the input string is too big, and Spark should follow it.
However, seems Spark has different behavior in two cases:
MySQL allows leading spaces but Spark does not.
If the input string is way too long, Spark fails with ArrayIndexOutOfBoundException
This patch now help conv follow behavior in those two cases
conv allows leading spaces
conv will return the max unsigned long when the input string is way too long
### Why are the changes needed?
fixing it to match the behavior of conv function to the (almost) only one reference of another DBMS, MySQL
### Does this PR introduce _any_ user-facing change?
Yes, as pointed out above
### How was this patch tested?
Add test
Closes#32684 from dgd-contributor/SPARK-33428.
Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Add a new data source V2 API: `LocalScan`. It is a special Scan that will happen on Driver locally instead of Executors.
### Why are the changes needed?
The new API improves the flexibility of the DSV2 API. It allows developers to implement connectors for data sources of small data sizes.
For example, we can build a data source for Spark History applications from Spark History Server RESTFUL API. The result set is small and fetching all the results from the Spark driver is good enough. Making it a data source allows us to operate SQL queries with filters or table joins.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test
Closes#32678 from gengliangwang/LocalScan.
Lead-authored-by: Gengliang Wang <ltnwgl@gmail.com>
Co-authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR group exception messages in `sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver`.
### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.
### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.
### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.
Closes#32646 from beliefer/SPARK-35057.
Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This is a followup from https://github.com/apache/spark/pull/32547#discussion_r639916474, where for LEFT ANTI join, we do not need to depend on `loaded` variable, as in `codegenAnti` we only load `streamedAfter` no more than once (i.e. assign column values from streamed row which are not used in join condition).
### Why are the changes needed?
Avoid unnecessary processing in code-gen (though it's just `boolean $loaded = false;`, and `if (!$loaded) { $loaded = true; }`).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing unite tests in `ExistenceJoinSuite`.
Closes#32681 from c21/join-followup.
Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
The main code change is:
* Change rule `DemoteBroadcastHashJoin` to `DynamicJoinSelection` and add shuffle hash join selection code.
* Specify a join strategy hint `SHUFFLE_HASH` if AQE think a join can be converted to SHJ.
* Skip `preferSortMerge` config check in AQE side if a join can be converted to SHJ.
### Why are the changes needed?
Use AQE runtime statistics to decide if we can use shuffled hash join instead of sort merge join. Currently, the formula of shuffled hash join selection dose not work due to the dymanic shuffle partition number.
Add a new config spark.sql.adaptive.shuffledHashJoinLocalMapThreshold to decide if join can be converted to shuffled hash join safely.
### Does this PR introduce _any_ user-facing change?
Yes, add a new config.
### How was this patch tested?
Add test.
Closes#32550 from ulysses-you/SPARK-35282-2.
Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Add the metrics to record how many tasks fallback to sort-based aggregation for hash aggregation. This will help developers and users to debug and optimize query. Object hash aggregation has similar metrics already.
### Why are the changes needed?
Help developers and users to debug and optimize query with hash aggregation.
### Does this PR introduce _any_ user-facing change?
Yes, the added metrics will show up in Spark web UI.
Example:
<img width="604" alt="Screen Shot 2021-05-26 at 12 17 08 AM" src="https://user-images.githubusercontent.com/4629931/119618437-bf3c5880-bdb7-11eb-89bb-5b88db78639f.png">
### How was this patch tested?
Changed unit test in `SQLMetricsSuite.scala`.
Closes#32671 from c21/agg-metrics.
Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR fixes `HiveExternalCatalogVersionsSuite`.
With this change, only <major>.<minor> version is set to `spark.sql.hive.metastore.version`.
### Why are the changes needed?
I'm personally checking whether all the tests pass with Java 11 for the current `master` and I found `HiveExternalCatalogVersionsSuite` fails.
The reason is that Spark 3.0.2 and 3.1.1 doesn't accept `2.3.8` as a hive metastore version.
`HiveExternalCatalogVersionsSuite` downloads Spark releases from https://dist.apache.org/repos/dist/release/spark/ and run test for each release. The Spark releases are `3.0.2` and `3.1.1` for the current `master` for now.
e47e615c0e/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala (L239-L259)
With Java 11, the suite run with a hive metastore version which corresponds to the builtin Hive version and it's `2.3.8` for the current `master`.
20750a3f9e/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala (L62-L66)
But `branch-3.0` and `branch-3.1` doesn't accept `2.3.8`, the suite with Java 11 fails.
Another solution would be backporting SPARK-34271 (#31371) but after [a discussion](https://github.com/apache/spark/pull/32668#issuecomment-848435170), we prefer to fix the test,
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests with CI.
Closes#32670 from sarutak/fix-version-suite-for-java11.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR aims to upgrade json4s from 3.7.0-M5 to 3.7.0-M11
Note: json4s version greater than 3.7.0-M11 is not binary compatible with Spark third party jars
### Why are the changes needed?
Multiple defect fixes and improvements like
https://github.com/json4s/json4s/issues/750https://github.com/json4s/json4s/issues/554https://github.com/json4s/json4s/issues/715
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Ran with the existing UTs
Closes#32636 from vinodkc/br_build_upgrade_json4s.
Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
Add the function type, such as "scala_udf", "python_udf", "java_udf", "hive", "built-in" to the `ExpressionInfo` for UDF.
### Why are the changes needed?
Make the `ExpressionInfo` of UDF more meaningful
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing and newly added UT
Closes#32587 from linhongliu-db/udf-language.
Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR proposes to use a proper built-in exceptions instead of the plain `Exception` in Python.
While I am here, I fixed another minor issue at `DataFrams.schema` together:
```diff
- except AttributeError as e:
- raise Exception(
- "Unable to parse datatype from schema. %s" % e)
+ except Exception as e:
+ raise ValueError(
+ "Unable to parse datatype from schema. %s" % e) from e
```
Now it catches all exceptions during schema parsing, chains the exception with `ValueError`. Previously it only caught `AttributeError` that does not catch all cases.
### Why are the changes needed?
For users to expect the proper exceptions.
### Does this PR introduce _any_ user-facing change?
Yeah, the exception classes became different but should be compatible because previous exception was plain `Exception` which other exceptions inherit.
### How was this patch tested?
Existing unittests should cover,
Closes#31238Closes#32650 from HyukjinKwon/SPARK-32194.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR improves the interaction between partition coalescing and skew handling by moving the skew join rule ahead of the partition coalescing rule and making corresponding changes to the two rules:
1. Simplify `OptimizeSkewedJoin` as it doesn't need to handle `CustomShuffleReaderExec` anymore.
2. Update `CoalesceShufflePartitions` to support coalescing non-skewed partitions.
### Why are the changes needed?
It's a bit hard to reason about skew join if partitions have been coalesced. A skewed partition needs to be much larger than other partitions and we need to look at the raw sizes before coalescing.
It also makes `OptimizeSkewedJoin` more robust, as we don't need to worry about a skewed partition being coalesced with a small partition and breaks skew join handling.
It also helps with https://github.com/apache/spark/pull/31653 , which needs to move `OptimizeSkewedJoin` to an earlier phase and run before `CoalesceShufflePartitions`.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
new UT and existing tests
Closes#32594 from cloud-fan/shuffle.
Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
* remove `EliminateUnnecessaryJoin`, using `AQEPropagateEmptyRelation` instead.
* eliminate join, aggregate, limit, repartition, sort, generate which is beneficial.
### Why are the changes needed?
Make `EliminateUnnecessaryJoin` available with more case.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add test.
Closes#32602 from ulysses-you/SPARK-35455.
Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Addressed the dongjoon-hyun comments on the previous PR #30018.
Extended the `RemoveRedundantAggregates` rule to remove redundant aggregations in even more queries. For example in
```
dataset
.dropDuplicates()
.groupBy('a)
.agg(max('b))
```
the `dropDuplicates` is not needed, because the result on `max` does not depend on duplicate values.
### Why are the changes needed?
Improve performance.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes#31914 from tanelk/SPARK-33122_redundant_aggs_followup.
Lead-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Co-authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
This PR fixes an issue that `RemoveRedundantProjects` removes `ProjectExec` which is for generating `UnsafeRow`.
In `DataSourceV2Strategy`, `ProjectExec` will be inserted to ensure internal rows are `UnsafeRow`.
```
private def withProjectAndFilter(
project: Seq[NamedExpression],
filters: Seq[Expression],
scan: LeafExecNode,
needsUnsafeConversion: Boolean): SparkPlan = {
val filterCondition = filters.reduceLeftOption(And)
val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
if (withFilter.output != project || needsUnsafeConversion) {
ProjectExec(project, withFilter)
} else {
withFilter
}
}
...
case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) =>
// projection and filters were already pushed down in the optimizer.
// this uses PhysicalOperation to get the projection and ensure that if the batch scan does
// not support columnar, a projection is added to convert the rows to UnsafeRow.
val batchExec = BatchScanExec(relation.output, relation.scan)
withProjectAndFilter(project, filters, batchExec, !batchExec.supportsColumnar) :: Nil
```
So, the hierarchy of the partial tree should be like `ProjectExec(FilterExec(BatchScan))`.
But `RemoveRedundantProjects` doesn't consider this type of hierarchy, leading `ClassCastException`.
A concreate example to reproduce this issue is reported:
```
import scala.collection.JavaConverters._
import org.apache.iceberg.{PartitionSpec, TableProperties}
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession}
import org.apache.spark.sql.internal.SQLConf
class RemoveRedundantProjectsTest extends QueryTest {
override val spark: SparkSession = SparkSession
.builder()
.master("local[4]")
.config("spark.driver.bindAddress", "127.0.0.1")
.appName(suiteName)
.getOrCreate()
test("RemoveRedundantProjects removes non-redundant projects") {
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val data = spark.range(3).toDF
val table = new HadoopTables().create(
SparkSchemaUtil.convert(data.schema),
PartitionSpec.unpartitioned(),
Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava,
path)
data.write.format("iceberg").mode("overwrite").save(path)
table.refresh()
val df = spark.read.format("iceberg").load(path)
val dfX = df.as("x")
val dfY = df.as("y")
val join = dfX.filter(dfX("id") > 0).join(dfY, "id")
join.explain("extended")
assert(join.count() == 2)
}
}
}
}
```
```
[info] - RemoveRedundantProjects removes non-redundant projects *** FAILED ***
[info] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 4) (xeroxms100.northamerica.corp.microsoft.com executor driver): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow
[info] at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
[info] at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
```
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New test.
Closes#32606 from sarutak/fix-project-removal-issue.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Use `SpecificInternalRow` instead of `GenericInternalRow` to avoid boxing / unboxing cost.
### Why are the changes needed?
Since it doesn't know the input row schema, `GenericInternalRow` potentially need to apply boxing for input arguments. It's better to use `SpecificInternalRow` instead since we know input data types.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#32647 from sunchao/specific-input-row.
Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR fixes a bug with subexpression elimination for CaseWhen statements. https://github.com/apache/spark/pull/30245 added support for creating subexpressions that are present in all branches of conditional statements. However, for a statement to be in "all branches" of a CaseWhen statement, it must also be in the elseValue.
### Why are the changes needed?
Fix a bug where a subexpression can be created and run for branches of a conditional that don't pass. This can cause issues especially with a UDF in a branch that gets executed assuming the condition is true.
### Does this PR introduce _any_ user-facing change?
Yes, fixes a potential bug where a UDF could be eagerly executed even though it might expect to have already passed some form of validation. For example:
```
val col = when($"id" < 0, myUdf($"id"))
spark.range(1).select(when(col > 0, col)).show()
```
`myUdf($"id")` is considered a subexpression and eagerly evaluated, because it is pulled out as a common expression from both executions of the when clause, but if `id >= 0` it should never actually be run.
### How was this patch tested?
Updated existing test with new case.
Closes#32595 from Kimahriman/bug-case-subexpr-elimination.
Authored-by: Adam Binford <adamq43@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>