Commit graph

11252 commits

Author SHA1 Message Date
Yuming Wang 6cd6c438f2 [SPARK-34808][SQL] Removes outer join if it only has DISTINCT on streamed side
### What changes were proposed in this pull request?

This pr add new rule to removes outer join if it only has distinct on streamed side. For example:
```scala
spark.range(200L).selectExpr("id AS a").createTempView("t1")
spark.range(300L).selectExpr("id AS b").createTempView("t2")
spark.sql("SELECT DISTINCT a FROM t1 LEFT JOIN t2 ON a = b").explain(true)
```

Before this pr:
```
== Optimized Logical Plan ==
Aggregate [a#2L], [a#2L]
+- Project [a#2L]
   +- Join LeftOuter, (a#2L = b#6L)
      :- Project [id#0L AS a#2L]
      :  +- Range (0, 200, step=1, splits=Some(2))
      +- Project [id#4L AS b#6L]
         +- Range (0, 300, step=1, splits=Some(2))
```

After this pr:
```
== Optimized Logical Plan ==
Aggregate [a#2L], [a#2L]
+- Project [id#0L AS a#2L]
   +- Range (0, 200, step=1, splits=Some(2))
```

### Why are the changes needed?

Improve query performance. [DB2](https://www.ibm.com/docs/en/db2-for-zos/11?topic=manipulation-how-db2-simplifies-join-operations) support this feature:
![image](https://user-images.githubusercontent.com/5399861/119594277-0d7c4680-be0e-11eb-8bd4-366d8c4639f0.png)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #31908 from wangyum/SPARK-34808.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-05-31 18:14:15 +08:00
Liang-Chi Hsieh 73ba4492b1 [SPARK-35566][SS] Fix StateStoreRestoreExec output rows
### What changes were proposed in this pull request?

This is a minor change to update how `StateStoreRestoreExec` computes its number of output rows. Previously we only count input rows, but the optionally restored rows are not counted in.

### Why are the changes needed?

Currently the number of output rows of `StateStoreRestoreExec` only counts the each input row. But it actually outputs input rows + optional restored rows. We should provide correct number of output rows.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #32703 from viirya/fix-outputrows.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-31 16:45:56 +09:00
allisonwang-db 806da9d6fa [SPARK-35545][SQL] Split SubqueryExpression's children field into outer attributes and join conditions
### What changes were proposed in this pull request?
This PR refactors `SubqueryExpression` class. It removes the children field from SubqueryExpression's constructor and adds `outerAttrs` and `joinCond`.

### Why are the changes needed?
Currently, the children field of a subquery expression is used to store both collected outer references in the subquery plan and join conditions after correlated predicates are pulled up.

For example:
`SELECT (SELECT max(c1) FROM t1 WHERE t1.c1 = t2.c1) FROM t2`

During the analysis phase, outer references in the subquery are stored in the children field: `scalar-subquery [t2.c1]`, but after the optimizer rule `PullupCorrelatedPredicates`, the children field will be used to store the join conditions, which contain both the inner and the outer references: `scalar-subquery [t1.c1 = t2.c1]`. This is why the references of SubqueryExpression excludes the inner plan's output:
29ed1a2de4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala (L68-L69)

This can be confusing and error-prone. The references for a subquery expression should always be defined as outer attribute references.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes #32687 from allisonwang-db/refactor-subquery-expr.

Authored-by: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-31 04:57:24 +00:00
yangjie01 09d039da56 [SPARK-35526][CORE][SQL][ML][MLLIB] Re-Cleanup procedure syntax is deprecated compilation warning in Scala 2.13
### What changes were proposed in this pull request?
After SPARK-29291 and SPARK-33352, there are still some compilation warnings about `procedure syntax is deprecated` as follows:

```
[WARNING] [Warn] /spark/core/src/main/scala/org/apache/spark/MapOutputTracker.scala:723: [deprecation   | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `registerMergeResult`'s return type
[WARNING] [Warn] /spark/core/src/main/scala/org/apache/spark/MapOutputTracker.scala:748: [deprecation   | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `unregisterMergeResult`'s return type
[WARNING] [Warn] /spark/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala:223: [deprecation   | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `testSimpleSpillingForAllCodecs`'s return type
[WARNING] [Warn] /spark/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASBenchmark.scala:53: [deprecation   | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `runBLASBenchmark`'s return type
[WARNING] [Warn] /spark/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala:110: [deprecation   | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `assertEmptyRootPath`'s return type
[WARNING] [Warn] /spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala:602: [deprecation   | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `executeCTASWithNonEmptyLocation`'s return type
```

So the main change of this pr is cleanup these compilation warnings.

### Why are the changes needed?
Eliminate compilation warnings in Scala 2.13 and this change should be compatible with Scala 2.12

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass the Jenkins or GitHub Action

Closes #32669 from LuciferYang/re-clean-procedure-syntax.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-30 16:49:47 -07:00
Yingyi Bu 5c8a141d03 [SPARK-35538][SQL] Migrate transformAllExpressions call sites to use transformAllExpressionsWithPruning
### What changes were proposed in this pull request?

Added the following TreePattern enums:
- EXCHANGE
- IN_SUBQUERY_EXEC
- UPDATE_FIELDS

Migrated `transformAllExpressions` call sites to use `transformAllExpressionsWithPruning`

### Why are the changes needed?

Reduce the number of tree traversals and hence improve the query compilation latency.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.
Perf diff:
Rule name | Total Time (baseline) | Total Time (experiment) | experiment/baseline
OptimizeUpdateFields | 54646396 | 27444424 | 0.5
ReplaceUpdateFieldsExpression  | 24694303 | 2087517 | 0.08

Closes #32643 from sigmod/all_expressions.

Authored-by: Yingyi Bu <yingyi.bu@databricks.com>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
2021-05-28 15:36:25 -07:00
Wenchen Fan 678592a612 [SPARK-35559][TEST] Speed up one test in AdaptiveQueryExecSuite
### What changes were proposed in this pull request?

I just noticed that `AdaptiveQueryExecSuite.SPARK-34091: Batch shuffle fetch in AQE partition coalescing` takes more than 10 minutes to finish, which is unacceptable.

This PR sets the shuffle partitions to 10 in that test, so that the test can finish with 5 seconds.

### Why are the changes needed?

speed up the test

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

Closes #32695 from cloud-fan/test.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-28 12:39:34 -07:00
Kousuke Saruta b763db3efd [SPARK-35194][SQL][FOLLOWUP] Recover build error with Scala 2.13 on GA
### What changes were proposed in this pull request?

This PR fixes a build error with Scala 2.13 on GA.
#32301 seems to bring this error.

### Why are the changes needed?

To recover CI.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

GA

Closes #32696 from sarutak/followup-SPARK-35194.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2021-05-29 00:11:16 +09:00
Karen Feng e8631660ec [SPARK-35194][SQL] Refactor nested column aliasing for readability
### What changes were proposed in this pull request?

Refactors `NestedColumnAliasing` and `GeneratorNestedColumnAliasing` for readability.

### Why are the changes needed?

Improves readability for future maintenance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #32301 from karenfeng/refactor-nested-column-aliasing.

Authored-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-28 13:18:44 +00:00
ulysses-you 3b94aad5e7 [SPARK-35552][SQL] Make query stage materialized more readable
### What changes were proposed in this pull request?

Add a new method `isMaterialized` in `QueryStageExec`.

### Why are the changes needed?

Currently, we use `resultOption().get.isDefined` to check if a query stage has materialized. The code is not readable at a glance. It's better to use a new method like `isMaterialized` to define it.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass CI.

Closes #32689 from ulysses-you/SPARK-35552.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-05-28 20:42:11 +08:00
Wenchen Fan 29ed1a2de4 [SPARK-35541][SQL] Simplify OptimizeSkewedJoin
### What changes were proposed in this pull request?

Various small code simplification/cleanup for OptimizeSkewedJoin

### Why are the changes needed?

code refactor

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #32685 from cloud-fan/skew-join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-05-27 09:17:28 -07:00
Yuanjian Li f98a063a4b [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata
### What changes were proposed in this pull request?
Initial implementation of RocksDBCheckpointMetadata. It persists the metadata for RocksDBFileManager.

### Why are the changes needed?
The RocksDBCheckpointMetadata persists the metadata for each committed batch in JSON format. The object contains all RocksDB file names and the number of total keys.
The metadata binds closely with the directory structure of RocksDBFileManager, as described in the design doc - [Directory Structure and Format for Files stored in DFS](https://docs.google.com/document/d/10wVGaUorgPt4iVe4phunAcjU924fa3-_Kf29-2nxH6Y/edit#heading=h.zgvw85ijoz2).

### Does this PR introduce _any_ user-facing change?
No. Internal implementation only.

### How was this patch tested?
New UT added.

Closes #32272 from xuanyuanking/SPARK-35172.

Lead-authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Co-authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-05-27 22:56:50 +09:00
dgd-contributor 52a1f8c000 [SPARK-33428][SQL] Match the behavior of conv function to MySQL's
### What changes were proposed in this pull request?
Spark conv function is from MySQL and it's better to follow the MySQL behavior. MySQL returns the max unsigned long if the input string is too big, and Spark should follow it.

However, seems Spark has different behavior in two cases:

MySQL allows leading spaces but Spark does not.
If the input string is way too long, Spark fails with ArrayIndexOutOfBoundException

This patch now help conv follow behavior in those two cases
conv allows leading spaces
conv will return the max unsigned long when the input string is way too long

### Why are the changes needed?
fixing it to match the behavior of conv function to the (almost) only one reference of another DBMS, MySQL

### Does this PR introduce _any_ user-facing change?
Yes, as pointed out above

### How was this patch tested?
Add test

Closes #32684 from dgd-contributor/SPARK-33428.

Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-27 12:12:39 +00:00
Gengliang Wang 5bcd1c29f0 [SPARK-35535][SQL] New data source V2 API: LocalScan
### What changes were proposed in this pull request?

Add a new data source V2 API: `LocalScan`. It is a special Scan that will happen on Driver locally instead of Executors.

### Why are the changes needed?

The new API improves the flexibility of the DSV2 API. It allows developers to implement connectors for data sources of small data sizes.
For example, we can build a data source for Spark History applications from Spark History Server RESTFUL API. The result set is small and fetching all the results from the Spark driver is good enough. Making it a data source allows us to operate SQL queries with filters or table joins.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test

Closes #32678 from gengliangwang/LocalScan.

Lead-authored-by: Gengliang Wang <ltnwgl@gmail.com>
Co-authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-27 19:31:56 +09:00
gengjiaan 3e190807bc [SPARK-35057][SQL] Group exception messages in hive/thriftserver
### What changes were proposed in this pull request?
This PR group exception messages in `sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver`.

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #32646 from beliefer/SPARK-35057.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-27 07:31:14 +00:00
Cheng Su 5cc17ba0c7 [SPARK-35351][SQL][FOLLOWUP] Avoid using loaded variable for LEFT ANTI SMJ code-gen
### What changes were proposed in this pull request?

This is a followup from https://github.com/apache/spark/pull/32547#discussion_r639916474, where for LEFT ANTI join, we do not need to depend on `loaded` variable, as in `codegenAnti` we only load `streamedAfter` no more than once (i.e. assign column values from streamed row which are not used in join condition).

### Why are the changes needed?

Avoid unnecessary processing in code-gen (though it's just `boolean $loaded = false;`, and `if (!$loaded) { $loaded = true; }`).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unite tests in `ExistenceJoinSuite`.

Closes #32681 from c21/join-followup.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-27 04:59:54 +00:00
ulysses-you dc7b5a99f0 [SPARK-35282][SQL] Support AQE side shuffled hash join formula using rule
### What changes were proposed in this pull request?

The main code change is:
* Change rule `DemoteBroadcastHashJoin` to `DynamicJoinSelection` and add shuffle hash join selection code.
* Specify a join strategy hint `SHUFFLE_HASH` if AQE think a join can be converted to SHJ.
* Skip `preferSortMerge` config check in AQE side if a join can be converted to SHJ.

### Why are the changes needed?

Use AQE runtime statistics to decide if we can use shuffled hash join instead of sort merge join. Currently, the formula of shuffled hash join selection dose not work due to the dymanic shuffle partition number.

Add a new config spark.sql.adaptive.shuffledHashJoinLocalMapThreshold to decide if join can be converted to shuffled hash join safely.

### Does this PR introduce _any_ user-facing change?

Yes, add a new config.

### How was this patch tested?

Add test.

Closes #32550 from ulysses-you/SPARK-35282-2.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-26 14:16:04 +00:00
Cheng Su dd677770d8 [SPARK-35529][SQL] Add fallback metrics for hash aggregate
### What changes were proposed in this pull request?

Add the metrics to record how many tasks fallback to sort-based aggregation for hash aggregation. This will help developers and users to debug and optimize query. Object hash aggregation has similar metrics already.

### Why are the changes needed?

Help developers and users to debug and optimize query with hash aggregation.

### Does this PR introduce _any_ user-facing change?

Yes, the added metrics will show up in Spark web UI.
Example:
<img width="604" alt="Screen Shot 2021-05-26 at 12 17 08 AM" src="https://user-images.githubusercontent.com/4629931/119618437-bf3c5880-bdb7-11eb-89bb-5b88db78639f.png">

### How was this patch tested?

Changed unit test in `SQLMetricsSuite.scala`.

Closes #32671 from c21/agg-metrics.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-26 11:28:12 +00:00
Kousuke Saruta 50fefc6447 [SPARK-35527][SQL][TESTS] Fix HiveExternalCatalogVersionsSuite to pass with Java 11
### What changes were proposed in this pull request?

This PR fixes `HiveExternalCatalogVersionsSuite`.
With this change, only <major>.<minor> version is set to `spark.sql.hive.metastore.version`.

### Why are the changes needed?

I'm personally checking whether all the tests pass with Java 11 for the current `master` and I found `HiveExternalCatalogVersionsSuite` fails.
The reason is that Spark 3.0.2 and 3.1.1 doesn't accept `2.3.8` as a hive metastore version.

`HiveExternalCatalogVersionsSuite` downloads Spark releases from https://dist.apache.org/repos/dist/release/spark/ and run test for each release. The Spark releases are `3.0.2` and `3.1.1` for the current `master` for now.
e47e615c0e/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala (L239-L259)

With Java 11, the suite run with a hive metastore version which corresponds to the builtin Hive version and it's `2.3.8` for the current `master`.
20750a3f9e/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala (L62-L66)

But `branch-3.0` and `branch-3.1` doesn't accept `2.3.8`, the suite with Java 11 fails.

Another solution would be backporting SPARK-34271 (#31371) but after [a discussion](https://github.com/apache/spark/pull/32668#issuecomment-848435170), we prefer to fix the test,

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests with CI.

Closes #32670 from sarutak/fix-version-suite-for-java11.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-26 17:20:51 +09:00
itholic 79a6b0cc8a [SPARK-35509][DOCS] Move text data source options from Python and Scala into a single page
### What changes were proposed in this pull request?

This PR proposes move text data source options from Python, Scala and Java into a single page.

### Why are the changes needed?

So far, the documentation for text data source options is separated into different pages for each language API documents. However, this makes managing many options inconvenient, so it is efficient to manage all options in a single page and provide a link to that page in the API of each language.

### Does this PR introduce _any_ user-facing change?

Yes, the documents will be shown below after this change:

- "Text Files" page
<img width="823" alt="Screen Shot 2021-05-26 at 3 20 11 PM" src="https://user-images.githubusercontent.com/44108233/119611669-f5202200-be35-11eb-9307-45846949d300.png">

- Python
<img width="791" alt="Screen Shot 2021-05-25 at 5 04 26 PM" src="https://user-images.githubusercontent.com/44108233/119462469-b9c11d00-bd7b-11eb-8f19-2ba7b9ceb318.png">

- Scala
<img width="683" alt="Screen Shot 2021-05-25 at 5 05 10 PM" src="https://user-images.githubusercontent.com/44108233/119462483-bd54a400-bd7b-11eb-8177-74e4d7035e63.png">

- Java
<img width="665" alt="Screen Shot 2021-05-25 at 5 05 36 PM" src="https://user-images.githubusercontent.com/44108233/119462501-bfb6fe00-bd7b-11eb-8161-12c58fabe7e2.png">

### How was this patch tested?

Manually build docs and confirm the page.

Closes #32660 from itholic/SPARK-35509.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-26 17:12:49 +09:00
Vinod KC e3c6907c99 [SPARK-35490][BUILD] Update json4s to 3.7.0-M11
### What changes were proposed in this pull request?
This PR aims to upgrade json4s from   3.7.0-M5  to 3.7.0-M11

Note: json4s version greater than 3.7.0-M11 is not binary compatible with Spark third party jars

### Why are the changes needed?
Multiple defect fixes and improvements  like

https://github.com/json4s/json4s/issues/750
https://github.com/json4s/json4s/issues/554
https://github.com/json4s/json4s/issues/715

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Ran with the existing UTs

Closes #32636 from vinodkc/br_build_upgrade_json4s.

Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-05-26 11:10:14 +03:00
Linhong Liu af1dba7ca5 [SPARK-35440][SQL] Add function type to ExpressionInfo for UDF
### What changes were proposed in this pull request?
Add the function type, such as "scala_udf", "python_udf", "java_udf", "hive", "built-in" to the `ExpressionInfo` for UDF.

### Why are the changes needed?
Make the `ExpressionInfo` of UDF more meaningful

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
existing and newly added UT

Closes #32587 from linhongliu-db/udf-language.

Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-26 04:40:53 +00:00
Hyukjin Kwon 20750a3f9e [SPARK-32194][PYTHON] Use proper exception classes instead of plain Exception
### What changes were proposed in this pull request?

This PR proposes to use a proper built-in exceptions instead of the plain `Exception` in Python.

While I am here, I fixed another minor issue at `DataFrams.schema` together:

```diff
- except AttributeError as e:
-     raise Exception(
-         "Unable to parse datatype from schema. %s" % e)
+ except Exception as e:
+     raise ValueError(
+         "Unable to parse datatype from schema. %s" % e) from e
```

Now it catches all exceptions during schema parsing, chains the exception with `ValueError`. Previously it only caught `AttributeError` that does not catch all cases.

### Why are the changes needed?

For users to expect the proper exceptions.

### Does this PR introduce _any_ user-facing change?

Yeah, the exception classes became different but should be compatible because previous exception was plain `Exception` which other exceptions inherit.

### How was this patch tested?

Existing unittests should cover,

Closes #31238

Closes #32650 from HyukjinKwon/SPARK-32194.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-26 11:54:40 +09:00
Wenchen Fan 859a53424a [SPARK-35447][SQL] Optimize skew join before coalescing shuffle partitions
### What changes were proposed in this pull request?

This PR improves the interaction between partition coalescing and skew handling by moving the skew join rule ahead of the partition coalescing rule and making corresponding changes to the two rules:
1. Simplify `OptimizeSkewedJoin` as it doesn't need to handle `CustomShuffleReaderExec` anymore.
2. Update `CoalesceShufflePartitions` to support coalescing non-skewed partitions.

### Why are the changes needed?

It's a bit hard to reason about skew join if partitions have been coalesced. A skewed partition needs to be much larger than other partitions and we need to look at the raw sizes before coalescing.

It also makes `OptimizeSkewedJoin` more robust, as we don't need to worry about a skewed partition being coalesced with a small partition and breaks skew join handling.

It also helps with https://github.com/apache/spark/pull/31653 , which needs to move `OptimizeSkewedJoin` to an earlier phase and run before `CoalesceShufflePartitions`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

new UT and existing tests

Closes #32594 from cloud-fan/shuffle.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-25 13:12:45 +00:00
ulysses-you 631077db08 [SPARK-35455][SQL] Unify empty relation optimization between normal and AQE optimizer
### What changes were proposed in this pull request?

* remove `EliminateUnnecessaryJoin`, using `AQEPropagateEmptyRelation` instead.
* eliminate join, aggregate, limit, repartition, sort, generate which is beneficial.

### Why are the changes needed?

Make `EliminateUnnecessaryJoin` available with more case.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Add test.

Closes #32602 from ulysses-you/SPARK-35455.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-25 08:59:59 +00:00
tanel.kiis@gmail.com 548e37b00b [SPARK-33122][SQL][FOLLOWUP] Extend RemoveRedundantAggregates optimizer rule to apply to more cases
### What changes were proposed in this pull request?

Addressed the dongjoon-hyun comments on the previous PR #30018.
Extended the `RemoveRedundantAggregates` rule to remove redundant aggregations in even more queries. For example in
 ```
dataset
    .dropDuplicates()
    .groupBy('a)
    .agg(max('b))
```
the `dropDuplicates` is not needed, because the result on `max` does not depend on duplicate values.

### Why are the changes needed?

Improve performance.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

Closes #31914 from tanelk/SPARK-33122_redundant_aggs_followup.

Lead-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Co-authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-05-25 10:04:37 +09:00
Kousuke Saruta d4fb98354a [SPARK-35287][SQL] Allow RemoveRedundantProjects to preserve ProjectExec which generates UnsafeRow for DataSourceV2ScanRelation
### What changes were proposed in this pull request?

This PR fixes an issue that `RemoveRedundantProjects` removes `ProjectExec` which is for generating `UnsafeRow`.
In `DataSourceV2Strategy`, `ProjectExec` will be inserted to ensure internal rows are `UnsafeRow`.

```
  private def withProjectAndFilter(
      project: Seq[NamedExpression],
      filters: Seq[Expression],
      scan: LeafExecNode,
      needsUnsafeConversion: Boolean): SparkPlan = {
    val filterCondition = filters.reduceLeftOption(And)
    val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)

    if (withFilter.output != project || needsUnsafeConversion) {
      ProjectExec(project, withFilter)
    } else {
      withFilter
    }
  }
...
    case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) =>
      // projection and filters were already pushed down in the optimizer.
      // this uses PhysicalOperation to get the projection and ensure that if the batch scan does
      // not support columnar, a projection is added to convert the rows to UnsafeRow.
      val batchExec = BatchScanExec(relation.output, relation.scan)
      withProjectAndFilter(project, filters, batchExec, !batchExec.supportsColumnar) :: Nil
```
So, the hierarchy of the partial tree should be like `ProjectExec(FilterExec(BatchScan))`.
But `RemoveRedundantProjects` doesn't consider this type of hierarchy, leading `ClassCastException`.

A concreate example to reproduce this issue is reported:
```
import scala.collection.JavaConverters._

import org.apache.iceberg.{PartitionSpec, TableProperties}
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession}
import org.apache.spark.sql.internal.SQLConf

class RemoveRedundantProjectsTest extends QueryTest {
  override val spark: SparkSession = SparkSession
    .builder()
    .master("local[4]")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .appName(suiteName)
    .getOrCreate()
  test("RemoveRedundantProjects removes non-redundant projects") {
    withSQLConf(
      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
      SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
      SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") {
      withTempDir { dir =>
        val path = dir.getCanonicalPath
        val data = spark.range(3).toDF
        val table = new HadoopTables().create(
          SparkSchemaUtil.convert(data.schema),
          PartitionSpec.unpartitioned(),
          Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava,
          path)
        data.write.format("iceberg").mode("overwrite").save(path)
        table.refresh()

        val df = spark.read.format("iceberg").load(path)
        val dfX = df.as("x")
        val dfY = df.as("y")
        val join = dfX.filter(dfX("id") > 0).join(dfY, "id")
        join.explain("extended")
        assert(join.count() == 2)
      }
    }
  }
}
```
```
[info] - RemoveRedundantProjects removes non-redundant projects *** FAILED ***
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 4) (xeroxms100.northamerica.corp.microsoft.com executor driver): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow
[info]  at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
[info]  at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
```

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #32606 from sarutak/fix-project-removal-issue.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-25 00:26:10 +08:00
Chao Sun c709efc1e7 [SPARK-34981][SQL][FOLLOWUP] Use SpecificInternalRow in ApplyFunctionExpression
### What changes were proposed in this pull request?

Use `SpecificInternalRow` instead of `GenericInternalRow` to avoid boxing / unboxing cost.

### Why are the changes needed?

Since it doesn't know the input row schema, `GenericInternalRow` potentially need to apply boxing for input arguments. It's better to use `SpecificInternalRow` instead since we know input data types.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #32647 from sunchao/specific-input-row.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-24 17:25:24 +09:00
Adam Binford 6c0c617bd0 [SPARK-35449][SQL] Only extract common expressions from CaseWhen values if elseValue is set
### What changes were proposed in this pull request?

This PR fixes a bug with subexpression elimination for CaseWhen statements. https://github.com/apache/spark/pull/30245 added support for creating subexpressions that are present in all branches of conditional statements. However, for a statement to be in "all branches" of a CaseWhen statement, it must also be in the elseValue.

### Why are the changes needed?

Fix a bug where a subexpression can be created and run for branches of a conditional that don't pass. This can cause issues especially with a UDF in a branch that gets executed assuming the condition is true.

### Does this PR introduce _any_ user-facing change?

Yes, fixes a potential bug where a UDF could be eagerly executed even though it might expect to have already passed some form of validation. For example:
```
val col = when($"id" < 0, myUdf($"id"))
spark.range(1).select(when(col > 0, col)).show()
```

`myUdf($"id")` is considered a subexpression and eagerly evaluated, because it is pulled out as a common expression from both executions of the when clause, but if `id >= 0` it should never actually be run.

### How was this patch tested?

Updated existing test with new case.

Closes #32595 from Kimahriman/bug-case-subexpr-elimination.

Authored-by: Adam Binford <adamq43@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-05-24 00:27:41 -07:00
Liang-Chi Hsieh 9e1b204bcc [SPARK-35410][SQL] SubExpr elimination should not include redundant children exprs in conditional expression
### What changes were proposed in this pull request?

This patch fixes a bug when dealing with common expressions in conditional expressions such as `CaseWhen` during subexpression elimination.

For example, previously we find common expressions among conditions of `CaseWhen`, but children expressions are also counted into. We should not count these children expressions as common expressions.

### Why are the changes needed?

If the redundant children expressions are counted as common expressions too, they will be redundantly evaluated and miss the subexpression elimination opportunity.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added tests.

Closes #32559 from viirya/SPARK-35410.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-23 08:24:44 -07:00
Hyukjin Kwon 1d9f09decb [SPARK-35480][SQL] Make percentile_approx work with pivot
### What changes were proposed in this pull request?

This PR proposes to avoid wrapping if-else to the constant literals for `percentage` and `accuracy` in `percentile_approx`. They are expected to be literals (or foldable expressions).

Pivot works by two phrase aggregations, and it works with manipulating the input to `null` for non-matched values (pivot column and value).

Note that pivot supports an optimized version without such logic with changing input to `null` for some types (non-nested types basically). So the issue fixed by this PR is only for complex types.

```scala
val df = Seq(
  ("a", -1.0), ("a", 5.5), ("a", 2.5), ("b", 3.0), ("b", 5.2)).toDF("type", "value")
  .groupBy().pivot("type", Seq("a", "b")).agg(
    percentile_approx(col("value"), array(lit(0.5)), lit(10000)))
df.show()
```

**Before:**

```
org.apache.spark.sql.AnalysisException: cannot resolve 'percentile_approx((IF((type <=> CAST('a' AS STRING)), value, CAST(NULL AS DOUBLE))), (IF((type <=> CAST('a' AS STRING)), array(0.5D), NULL)), (IF((type <=> CAST('a' AS STRING)), 10000, CAST(NULL AS INT))))' due to data type mismatch: The accuracy or percentage provided must be a constant literal;
'Aggregate [percentile_approx(if ((type#7 <=> cast(a as string))) value#8 else cast(null as double), if ((type#7 <=> cast(a as string))) array(0.5) else cast(null as array<double>), if ((type#7 <=> cast(a as string))) 10000 else cast(null as int), 0, 0) AS a#16, percentile_approx(if ((type#7 <=> cast(b as string))) value#8 else cast(null as double), if ((type#7 <=> cast(b as string))) array(0.5) else cast(null as array<double>), if ((type#7 <=> cast(b as string))) 10000 else cast(null as int), 0, 0) AS b#18]
+- Project [_1#2 AS type#7, _2#3 AS value#8]
   +- LocalRelation [_1#2, _2#3]
```

**After:**

```
+-----+-----+
|    a|    b|
+-----+-----+
|[2.5]|[3.0]|
+-----+-----+
```

### Why are the changes needed?

To make percentile_approx work with pivot as expected

### Does this PR introduce _any_ user-facing change?

Yes. It threw an exception but now it returns a correct result as shown above.

### How was this patch tested?

Manually tested and unit test was added.

Closes #32619 from HyukjinKwon/SPARK-35480.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-23 07:35:43 +09:00
Wenchen Fan b624b7e93f [SPARK-28551][SQL][FOLLOWUP] Use the corrected hadoop conf
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/32411, to fix a mistake and use `sparkSession.sessionState.newHadoopConf` which includes SQL configs instead of `sparkSession.sparkContext.hadoopConfiguration` .

### Why are the changes needed?

fix mistake

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #32618 from cloud-fan/follow1.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Kent Yao <yao@apache.org>
2021-05-22 10:33:57 +08:00
Liang-Chi Hsieh 066944c1bd [SPARK-35439][SQL] Children subexpr should come first than parent subexpr
### What changes were proposed in this pull request?

This patch sorts equivalent expressions based on their child-parent relation.

### Why are the changes needed?

`EquivalentExpressions` maintains a map of equivalent expressions. It is `HashMap` now so the insertion order is not guaranteed to be preserved later. Subexpression elimination relies on retrieving subexpressions from the map. If there is child-parent relationships among the subexpressions, we want the child expressions come first than parent expressions, so we can replace child expressions in parent expressions with subexpression evaluation.

For example, we have two different expressions `Add(Literal(1), Literal(2))` and `Add(Literal(3), add)`.

Case 1: child subexpr comes first.
```scala
addExprTree(add)
addExprTree(Add(Literal(3), add))
addExprTree(Add(Literal(3), add))
```

Case 2: parent subexpr comes first. For this case, we need to sort equivalent expressions.
```
addExprTree(Add(Literal(3), add))  => We add `Add(Literal(3), add)` into the map first, then add `add` into the map
addExprTree(add)
addExprTree(Add(Literal(3), add))
```

As we are going to sort equivalent expressions at all, we don't need `LinkedHashMap` but just do sorting.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added tests.

Closes #32586 from viirya/use-listhashmap.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-05-21 10:49:35 -07:00
Dongjoon Hyun cc05daa884 [SPARK-34558][SQL][TESTS][FOLLOWUP] Fix a test to use a unknown filesystem
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/32622 to fix a test case.

### Why are the changes needed?

Fix a wrong test case name and fix the test case to cause the expected error correctly.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes #32623 from dongjoon-hyun/SPARK-34558.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-21 10:13:49 -07:00
Wenchen Fan 7274e3a4d2 [SPARK-34558][SQL][FOLLOWUP] Do not fail Spark startup with a broken FileSystem
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/31671

https://github.com/apache/spark/pull/31671 qualifies the warehouse at the beginning, which may fail Spark startup if something goes wrong, like the underlying FileSystem can't be initialized.

This PR falls back to the old behavior and leave the warehouse path unqualified if qualifying fails.

### Why are the changes needed?

Fix a regression. It's important to be always able to start Spark app (e.g. spark-shell), so that we can debug.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

a new test case

Closes #32622 from cloud-fan/follow2.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-21 08:51:10 -07:00
itholic d2bdd6595e [SPARK-35025][SQL][PYTHON][DOCS] Move Parquet data source options from Python and Scala into a single page
### What changes were proposed in this pull request?

This PR proposes move Parquet data source options from Python, Scala and Java into a single page.

### Why are the changes needed?

So far, the documentation for Parquet data source options is separated into different pages for each language API documents. However, this makes managing many options inconvenient, so it is efficient to manage all options in a single page and provide a link to that page in the API of each language.

### Does this PR introduce _any_ user-facing change?

Yes, the documents will be shown below after this change:

- "Parquet Files" page
![Screen Shot 2021-05-21 at 1 35 08 PM](https://user-images.githubusercontent.com/44108233/119082866-e7375f00-ba39-11eb-9ade-a931a5957b34.png)

- Python
![Screen Shot 2021-05-21 at 1 38 27 PM](https://user-images.githubusercontent.com/44108233/119082879-eef70380-ba39-11eb-9e8e-ee50eed98dbe.png)

- Scala
![Screen Shot 2021-05-21 at 1 36 52 PM](https://user-images.githubusercontent.com/44108233/119082884-f1595d80-ba39-11eb-98d5-966657df65f7.png)

- Java
![Screen Shot 2021-05-21 at 1 37 19 PM](https://user-images.githubusercontent.com/44108233/119082888-f4544e00-ba39-11eb-8bf8-47ce78ec0b01.png)

### How was this patch tested?

Manually build docs and confirm the page.

Closes #32161 from itholic/SPARK-34491.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-21 18:05:49 +09:00
itholic 419ddcb2a4 [SPARK-34494][SQL][DOCS] Move JSON data source options from Python and Scala into a single page
### What changes were proposed in this pull request?

This PR proposes move JSON data source options from Python, Scala and Java into a single page.

### Why are the changes needed?

So far, the documentation for JSON data source options is separated into different pages for each language API documents. However, this makes managing many options inconvenient, so it is efficient to manage all options in a single page and provide a link to that page in the API of each language.

### Does this PR introduce _any_ user-facing change?

Yes, the documents will be shown below after this change:

- "JSON Files" page
<img width="876" alt="Screen Shot 2021-05-20 at 8 48 27 PM" src="https://user-images.githubusercontent.com/44108233/118973662-ddb3e580-b9ac-11eb-987c-8139aa9c3fe2.png">

- Python
<img width="714" alt="Screen Shot 2021-04-16 at 5 04 11 PM" src="https://user-images.githubusercontent.com/44108233/114992491-ca0cef00-9ed5-11eb-9d0f-4de60d8b2516.png">

- Scala
<img width="726" alt="Screen Shot 2021-04-16 at 5 04 54 PM" src="https://user-images.githubusercontent.com/44108233/114992594-e315a000-9ed5-11eb-8bd3-af7e568fcfe1.png">

- Java
<img width="911" alt="Screen Shot 2021-04-16 at 5 06 11 PM" src="https://user-images.githubusercontent.com/44108233/114992751-10624e00-9ed6-11eb-888c-8668d3c74289.png">

### How was this patch tested?

Manually build docs and confirm the page.

Closes #32204 from itholic/SPARK-35081.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-21 18:05:13 +09:00
itholic 0fe65b5365 [SPARK-35395][DOCS] Move ORC data source options from Python and Scala into a single page
### What changes were proposed in this pull request?

This PR proposes move ORC data source options from Python, Scala and Java into a single page.

### Why are the changes needed?

So far, the documentation for ORC data source options is separated into different pages for each language API documents. However, this makes managing many options inconvenient, so it is efficient to manage all options in a single page and provide a link to that page in the API of each language.

### Does this PR introduce _any_ user-facing change?

Yes, the documents will be shown below after this change:

- "ORC Files" page
![Screen Shot 2021-05-21 at 2 07 14 PM](https://user-images.githubusercontent.com/44108233/119085078-f4564d00-ba3d-11eb-8990-3ba031d809da.png)

- Python
![Screen Shot 2021-05-21 at 2 06 46 PM](https://user-images.githubusercontent.com/44108233/119085097-00daa580-ba3e-11eb-8017-ac5a95a7c053.png)

- Scala
![Screen Shot 2021-05-21 at 2 06 09 PM](https://user-images.githubusercontent.com/44108233/119085135-164fcf80-ba3e-11eb-9cac-78dded523f38.png)

- Java
![Screen Shot 2021-05-21 at 2 06 30 PM](https://user-images.githubusercontent.com/44108233/119085125-118b1b80-ba3e-11eb-9434-f26612d7da13.png)

### How was this patch tested?

Manually build docs and confirm the page.

Closes #32546 from itholic/SPARK-35395.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-21 18:03:57 +09:00
yi.wu 34284c0649 [SPARK-35454][SQL] One LogicalPlan can match multiple dataset ids
### What changes were proposed in this pull request?

Change the type of `DATASET_ID_TAG` from `Long` to `HashSet[Long]` to allow the logical plan to match multiple datasets.

### Why are the changes needed?

During the transformation from one Dataset to another Dataset, the DATASET_ID_TAG of logical plan won't change if the plan itself doesn't change:

b5241c97b1/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala (L234-L237)

However, dataset id always changes even if the logical plan doesn't change:
b5241c97b1/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala (L207-L208)

And this can lead to the mismatch between dataset's id and col's __dataset_id. E.g.,

```scala
  test("SPARK-28344: fail ambiguous self join - Dataset.colRegex as column ref") {
    // The test can fail if we change it to:
    // val df1 = spark.range(3).toDF()
    // val df2 = df1.filter($"id" > 0).toDF()
    val df1 = spark.range(3)
    val df2 = df1.filter($"id" > 0)

    withSQLConf(
      SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true",
      SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
      assertAmbiguousSelfJoin(df1.join(df2, df1.colRegex("id") > df2.colRegex("id")))
    }
  }
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit tests.

Closes #32616 from Ngone51/fix-ambiguous-join.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-21 16:15:17 +08:00
ulysses-you 83737852f0 [SPARK-35063][SQL][FOLLOWUP] Fix scala 2.13 error
### What changes were proposed in this pull request?

### Why are the changes needed?

Fix scala compile error.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass GA

Closes #32617 from ulysses-you/scala2-13.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-21 07:59:26 +00:00
Max Gekk 2b08070e79 [SPARK-35427][SQL][TESTS] Check the EXCEPTION rebase mode for Avro/Parquet
### What changes were proposed in this pull request?
Add tests to check the `EXCEPTION` rebase mode explicitly in the datasources:
- Parquet: `DATE` type and `TIMESTAMP`: `INT96`, `TIMESTAMP_MICROS`, `TIMESTAMP_MILLIS`
- Avro: `DATE` type and `TIMESTAMP`: `timestamp-millis` and `timestamp-micros`.

### Why are the changes needed?
1. To improve test coverage
2. The `EXCEPTION` rebase mode should be checked independently from the default settings.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *AvroV2Suite"
$ build/sbt "test:testOnly *ParquetRebaseDatetimeV1Suite"
```

Closes #32574 from MaxGekk/test-rebase-exception.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-21 06:18:06 +00:00
gengjiaan c740c097e0 [SPARK-35063][SQL] Group exception messages in sql/catalyst
### What changes were proposed in this pull request?
This PR group exception messages in `sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst`.

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #32478 from beliefer/SPARK-35063.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-21 06:15:26 +00:00
Takeshi Yamamuro 1a923f5319 [SPARK-35479][SQL] Format PartitionFilters IN strings in scan nodes
### What changes were proposed in this pull request?

This PR proposes to format strings correctly for `PushedFilters`. For example, `explain()` for a query below prints `v in (array('a'))` as `PushedFilters: [In(v, [WrappedArray(a)])]`;

```
scala> sql("create table t (v array<string>) using parquet")
scala> sql("select * from t where v in (array('a'), null)").explain()
== Physical Plan ==
*(1) Filter v#4 IN ([a],null)
+- FileScan parquet default.t[v#4] Batched: false, DataFilters: [v#4 IN ([a],null)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-3.1.1-bin-hadoop2.7/spark-warehouse/t], PartitionFilters: [], PushedFilters: [In(v, [WrappedArray(a),null])], ReadSchema: struct<v:array<string>>
```
This PR makes `explain()` print it as `PushedFilters: [In(v, [[a]])]`;
```
scala> sql("select * from t where v in (array('a'), null)").explain()
== Physical Plan ==
*(1) Filter v#4 IN ([a],null)
+- FileScan parquet default.t[v#4] Batched: false, DataFilters: [v#4 IN ([a],null)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-3.1.1-bin-hadoop2.7/spark-warehouse/t], PartitionFilters: [], PushedFilters: [In(v, [[a],null])], ReadSchema: struct<v:array<string>>
```
NOTE: This PR includes a bugfix caused by #32577 (See the cloud-fan comment: https://github.com/apache/spark/pull/32577/files#r636108150).

### Why are the changes needed?

To improve explain strings.

### Does this PR introduce _any_ user-facing change?

Yes, this PR improves the explain strings for pushed-down filters.

### How was this patch tested?

Added tests in `SQLQueryTestSuite`.

Closes #32615 from maropu/ExplainPartitionFilters.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-21 05:45:45 +00:00
yi.wu e1296eab5f [SPARK-35445][SQL] Reduce the execution time of DeduplicateRelations
### What changes were proposed in this pull request?

This PR reduces the execution time of `DeduplicateRelations` by:

1) use `Set` instead `Seq` to check duplicate relations

2) avoid plan output traverse and attribute rewrites when there are no changes in the children plan

### Why are the changes needed?

Rule `DeduplicateRelations` is slow.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Run `TPCDSQuerySuite` and checked the run time of `DeduplicateRelations`. The time has been reduced by 77.9% after this PR.

Closes #32590 from Ngone51/improve-dedup.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Gengliang Wang <ltnwgl@gmail.com>
2021-05-21 13:25:37 +08:00
Kent Yao 2e9936db93 [SPARK-35456][CORE] Print the invalid value in config validation error message
### What changes were proposed in this pull request?

Print the invalid value in config validation error message for `checkValue` just like `checkValues`

### Why are the changes needed?

Invalid configuration values may come in many ways, this PR can help different kinds of users or developers to identify what the config the error is related to

### Does this PR introduce _any_ user-facing change?

yes, but only error msg
### How was this patch tested?

yes, modified tests

Closes #32600 from yaooqinn/SPARK-35456.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-21 14:22:29 +09:00
Vinod KC bdd8e1dbb1 [SPARK-28551][SQL] CTAS with LOCATION should not allow to a non-empty directory
### What changes were proposed in this pull request?

CTAS with location clause acts as an insert overwrite. This can cause problems when there are subdirectories within a location directory.
This causes some users to accidentally wipe out directories with very important data. We should not allow CTAS with location to a non-empty directory.

### Why are the changes needed?

Hive already handled this scenario: HIVE-11319

Steps to reproduce:

```scala
sql("""create external table  `demo_CTAS`( `comment` string) PARTITIONED BY (`col1` string, `col2` string) STORED AS parquet location '/tmp/u1/demo_CTAS'""")
sql("""INSERT OVERWRITE TABLE demo_CTAS partition (col1='1',col2='1') VALUES ('abc')""")
sql("select* from demo_CTAS").show
sql("""create table ctas1 location '/tmp/u2/ctas1' as select * from demo_CTAS""")
sql("select* from ctas1").show
sql("""create table ctas2 location '/tmp/u2' as select * from demo_CTAS""")
```

Before the fix: Both create table operations will succeed. But values in table ctas1 will be replaced by ctas2 accidentally.

After the fix: `create table ctas2...` will throw `AnalysisException`:

```
org.apache.spark.sql.AnalysisException: CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory /tmp/u2 . To allow overwriting the existing non-empty directory, set 'spark.sql.legacy.allowNonEmptyLocationInCTAS' to true.
```

### Does this PR introduce _any_ user-facing change?
Yes, if the location directory is not empty, CTAS with location will throw AnalysisException

```
sql("""create table ctas2 location '/tmp/u2' as select * from demo_CTAS""")
```
```
org.apache.spark.sql.AnalysisException: CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory /tmp/u2 . To allow overwriting the existing non-empty directory, set 'spark.sql.legacy.allowNonEmptyLocationInCTAS' to true.
```

`CREATE TABLE AS SELECT` with non-empty `LOCATION` will throw `AnalysisException`. To restore the behavior before Spark 3.2, need to  set `spark.sql.legacy.allowNonEmptyLocationInCTAS` to `true`. , default value is `false`.
Updated SQL migration guide.

### How was this patch tested?
Test case added in SQLQuerySuite.scala

Closes #32411 from vinodkc/br_fixCTAS_nonempty_dir.

Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-20 06:13:18 +00:00
Cheng Su 586caae3cc [SPARK-35438][SQL][DOCS] Minor documentation fix for window physical operator
### What changes were proposed in this pull request?

As title. Fixed two places where the documentation for window operator has some error.

### Why are the changes needed?

Help people read code for window operator more easily in the future.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #32585 from c21/minor-doc.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-05-20 08:47:19 +09:00
Andy Grove 52e3cf9ff5 [SPARK-35093][SQL] AQE now uses newQueryStage plan as key for looking up cached exchanges for re-use
### What changes were proposed in this pull request?
AQE has an optimization where it attempts to reuse compatible exchanges but it does not take into account whether the exchanges are columnar or not, resulting in incorrect reuse under some circumstances.

This PR simply changes the key used to lookup cached stages. It now uses the canonicalized form of the new query stage (potentially created by a plugin) rather than using the canonicalized form of the original exchange.

### Why are the changes needed?
When using the [RAPIDS Accelerator for Apache Spark](https://github.com/NVIDIA/spark-rapids) we sometimes see a new query stage correctly create a row-based exchange and then Spark replaces it with a cached columnar exchange, which is not compatible, and this causes queries to fail.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
The patch has been tested with the query that highlighted this issue. I looked at writing unit tests for this but it would involve implementing a mock columnar exchange in the tests so would be quite a bit of work. If anyone has ideas on other ways to test this I am happy to hear them.

Closes #32195 from andygrove/SPARK-35093.

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
2021-05-19 07:45:26 -05:00
shahid 12142130cd [SPARK-35362][SQL] Update null count in the column stats for UNION operator stats estimation
### What changes were proposed in this pull request?
Updating column stats for Union operator stats estimation
### Why are the changes needed?
This is a followup PR to update the null count also in the Union stats operator estimation. https://github.com/apache/spark/pull/30334

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Updated UTs, manual testing

Closes #32494 from shahidki31/shahid/updateNullCountForUnion.

Lead-authored-by: shahid <shahidki31@gmail.com>
Co-authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-05-19 21:23:19 +09:00
Kousuke Saruta 9283bebbbd [SPARK-35418][SQL] Add sentences function to functions.{scala,py}
### What changes were proposed in this pull request?

This PR adds `sentences`, a string function, which is present as of `2.0.0` but missing in `functions.{scala,py}`.

### Why are the changes needed?

This function can be only used from SQL for now.
It's good if we can use this function from Scala/Python code as well as SQL.

### Does this PR introduce _any_ user-facing change?

Yes. Users can use this function from Scala and Python.

### How was this patch tested?

New test.

Closes #32566 from sarutak/sentences-function.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2021-05-19 20:07:28 +09:00
shahid 46f7d780d3 [SPARK-35368][SQL] Update histogram statistics for RANGE operator for stats estimation
### What changes were proposed in this pull request?
Update histogram statistics for RANGE operator stats estimation
### Why are the changes needed?
If histogram optimization is enabled, this statistics can be used in various cost based optimizations.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UTs. Manual test.

Closes #32498 from shahidki31/shahid/histogram.

Lead-authored-by: shahid <shahidki31@gmail.com>
Co-authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-05-19 16:49:32 +09:00