Commit graph

11590 commits

Author SHA1 Message Date
allisonwang-db aea36aa977 [SPARK-36028][SQL][3.2] Allow Project to host outer references in scalar subqueries
This PR cherry picks https://github.com/apache/spark/pull/33235 to branch-3.2 to fix test failures introduced by https://github.com/apache/spark/pull/33284.

### What changes were proposed in this pull request?
This PR allows the `Project` node to host outer references in scalar subqueries when `decorrelateInnerQuery` is enabled. It is already supported by the new decorrelation framework and the `RewriteCorrelatedScalarSubquery` rule.

Note currently by default all correlated subqueries will be decorrelated, which is not necessarily the most optimal approach. Consider `SELECT (SELECT c1) FROM t`. This should be optimized as `SELECT c1 FROM t` instead of rewriting it as a left outer join. This will be done in a separate PR to optimize correlated scalar/lateral subqueries with OneRowRelation.

### Why are the changes needed?
To allow more types of correlated scalar subqueries.

### Does this PR introduce _any_ user-facing change?
Yes. This PR allows outer query column references in the SELECT cluase of a correlated scalar subquery. For example:
```sql
SELECT (SELECT c1) FROM t;
```
Before this change:
```
org.apache.spark.sql.AnalysisException: Expressions referencing the outer query are not supported
outside of WHERE/HAVING clauses
```

After this change:
```
+------------------+
|scalarsubquery(c1)|
+------------------+
|0                 |
|1                 |
+------------------+
```

### How was this patch tested?
Added unit tests and SQL tests.

(cherry picked from commit ca348e50a4)
Signed-off-by: allisonwang-db <allison.wangdatabricks.com>

Closes #33527 from allisonwang-db/spark-36028-3.2.

Authored-by: allisonwang-db <allison.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-28 12:54:15 +08:00
Huaxin Gao 33ef52e2c0 [SPARK-34952][SQL][FOLLOW-UP] DSv2 aggregate push down follow-up
### What changes were proposed in this pull request?
update java doc, JDBC data source doc, address follow up comments

### Why are the changes needed?
update doc and address follow up comments

### Does this PR introduce _any_ user-facing change?
Yes, add the new JDBC option `pushDownAggregate` in JDBC data source doc.

### How was this patch tested?
manually checked

Closes #33526 from huaxingao/aggPD_followup.

Authored-by: Huaxin Gao <huaxin_gao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit c8dd97d456)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-28 12:52:58 +08:00
Liang-Chi Hsieh dcd37f9639 Revert "[SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package"
This reverts commit 634f96dde4.

Closes #33533 from viirya/revert-SPARK-36136.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 22ac98dcbf)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-27 19:11:51 +09:00
Linhong Liu 91b9de3d80 [SPARK-36241][SQL] Support creating tables with null column
### What changes were proposed in this pull request?
Previously we blocked creating tables with the null column to follow the hive behavior in PR #28833
In this PR, I propose the restore the previous behavior to support the null column in a table.

### Why are the changes needed?
For a complex query, it's possible to generate a column with null type. If this happens to the input query of
CTAS, the query will fail due to Spark doesn't allow creating a table with null type. From the user's perspective,
it’s hard to figure out why the null type column is produced in the complicated query and how to fix it. So removing
this constraint is more friendly to users.

### Does this PR introduce _any_ user-facing change?
Yes, this reverts the previous behavior change in #28833, for example, below command will success after this PR
```sql
CREATE TABLE t (col_1 void, col_2 int)
```

### How was this patch tested?
newly added and existing test cases

Closes #33488 from linhongliu-db/SPARK-36241-support-void-column.

Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 8e7e14dc0d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-27 17:32:16 +08:00
Wenchen Fan 14328e043d [SPARK-36247][SQL] Check string length for char/varchar and apply type coercion in UPDATE/MERGE command
### What changes were proposed in this pull request?

We added the char/varchar support in 3.1, but the string length check is only applied to INSERT, not UPDATE/MERGE. This PR fixes it. This PR also adds the missing type coercion for UPDATE/MERGE.

### Why are the changes needed?

complete the char/varchar support and make UPDATE/MERGE easier to use by doing type coercion.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new UT. No built-in source support UPDATE/MERGE so end-to-end test is not applicable here.

Closes #33468 from cloud-fan/char.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 068f8d434a)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-27 13:57:26 +08:00
Chao Sun ae7b32a9e8 [SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package
### What changes were proposed in this pull request?

Move both `PruneFileSourcePartitionsSuite` and `PrunePartitionSuiteBase` to the package `org.apache.spark.sql.execution.datasources`. Did a few refactoring to enable this.

### Why are the changes needed?

Currently both `PruneFileSourcePartitionsSuite` and `PrunePartitionSuiteBase` are in package `org.apache.spark.sql.hive.execution` which doesn't look correct as these tests are not specific to Hive. Therefore, it's better to move them into `org.apache.spark.sql.execution.datasources`, the same place where the rule `PruneFileSourcePartitions` is at.

### Does this PR introduce _any_ user-facing change?

No, it's just test refactoring.

### How was this patch tested?

Using existing tests:
```
build/sbt "sql/testOnly *PruneFileSourcePartitionsSuite"
```
and
```
build/sbt "hive/testOnly *PruneHiveTablePartitionsSuite"
```

Closes #33350 from sunchao/SPARK-36136-partitions-suite.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 634f96dde4)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-26 13:04:06 -07:00
Hyukjin Kwon a77c9d6d17 [SPARK-36217][SQL] Rename CustomShuffleReader and OptimizeLocalShuffleReader in AQE
### What changes were proposed in this pull request?

This PR proposes to rename:

- Rename `*Reader`/`*reader` to `*Read`/`*read` for rules and execution plan (user-facing doc/config name remain untouched)
  - `*ShuffleReaderExec` ->`*ShuffleReadExec`
  - `isLocalReader` -> `isLocalRead`
  - ...
- Rename `CustomShuffle*` prefix to `AQEShuffle*`
- Rename `OptimizeLocalShuffleReader` rule to `OptimizeShuffleWithLocalRead`

### Why are the changes needed?

There are multiple problems in the current naming:

- `CustomShuffle*` -> `AQEShuffle*`
    it sounds like it is a pluggable API. However, this is actually only used by AQE.
- `OptimizeLocalShuffleReader` -> `OptimizeShuffleWithLocalRead`
    it is the name of a rule but it can be misread as a reader, which is counterintuative
- `*ReaderExec` -> `*ReadExec`
    Reader execution reads a bit odd. It should better be read execution (like `ScanExec`, `ProjectExec` and `FilterExec`). I can't find the reason to name it with something that performs an action. See also the generated plans:

    Before:

    ```
    ...
    * HashAggregate (12)
       +- CustomShuffleReader (11)
          +- ShuffleQueryStage (10)
             +- Exchange (9)
    ...
    ```

    After:

    ```
    ...
    * HashAggregate (12)
       +- AQEShuffleRead (11)
          +- ShuffleQueryStage (10)
             +- Exchange (9)
    ..
    ```

### Does this PR introduce _any_ user-facing change?

No, internal refactoring.

### How was this patch tested?

Existing unittests should cover the changes.

Closes #33429 from HyukjinKwon/SPARK-36217.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 6e3d404cec)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-26 22:42:16 +08:00
Angerszhuuuu 07c7a6f739 [SPARK-34402][SQL] Group exception about data format schema
### What changes were proposed in this pull request?
Group exception about data format schema of different format, orc/parquet

### Why are the changes needed?
group exception

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Not need

Closes #33296 from AngersZhuuuu/SPARK-34402.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit a63802f2c6)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-26 19:18:56 +08:00
Cheng Su f42cc10512 [SPARK-36269][SQL] Fix only set data columns to Hive column names config
### What changes were proposed in this pull request?

When reading Hive table, we set the Hive column id and column name configs (`hive.io.file.readcolumn.ids` and `hive.io.file.readcolumn.names`). We should set non-partition columns (data columns) for both configs, as Spark always [appends partition columns in its own Hive reader](https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala#L240). The column id config has only non-partition columns, but column name config has both partition and non-partition columns. We should keep them to be consistent with only non-partition columns. This does not cause issue for public OSS Hive file format for now. But for customized internal Hive file format, it causes the issue as we are expecting these two configs to be same.

### Why are the changes needed?

Fix the code logic to be more consistent.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing Hive tests.

Closes #33489 from c21/hive-col.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit e5616e32ee)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-26 18:48:26 +08:00
michaelzhang-db ec91818e14 [SPARK-36105][SQL] OptimizeLocalShuffleReader support reading data of multiple mappers in one task
### What changes were proposed in this pull request?
Added another partition spec to allow OptimizeLocalShuffleReader rule to read data from multiple mappers if the parallelism is less than the number of mappers.

### Why are the changes needed?
Optimization to the OptimizeLocalShuffleReader rule

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit tests

Closes #33310 from michaelzhang-db/supportDataFromMultipleMappers.

Authored-by: michaelzhang-db <michael.zhang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 094ae3708f)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-26 17:57:15 +08:00
Huaxin Gao b1f522cf97 [SPARK-34952][SQL] DSv2 Aggregate push down APIs
### What changes were proposed in this pull request?
Add interfaces and APIs to push down Aggregates to V2 Data Source

### Why are the changes needed?
improve performance

### Does this PR introduce _any_ user-facing change?
SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED was added. If this is set to true, Aggregates are pushed down to Data Source.

### How was this patch tested?
New tests were added to test aggregates push down in https://github.com/apache/spark/pull/32049.  The original PR is split into two PRs. This PR doesn't contain new tests.

Closes #33352 from huaxingao/aggPushDownInterface.

Authored-by: Huaxin Gao <huaxin_gao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit c561ee6865)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-26 16:01:43 +08:00
Liang-Chi Hsieh a6418a3463 [SPARK-36270][BUILD] Change memory settings for enabling GA
### What changes were proposed in this pull request?

Trying to adjust build memory settings and serial execution to re-enable GA.

### Why are the changes needed?

GA tests are failed recently due to return code 137. We need to adjust build settings to make GA work.

### Does this PR introduce _any_ user-facing change?

No, dev only.

### How was this patch tested?

GA

Closes #33447 from viirya/test-ga.

Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit fd36ed4550)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-23 19:11:09 +09:00
Gengliang Wang c5697d0f4a [SPARK-36257][SQL][3.2] Updated the version of TimestampNTZ related changes as 3.3.0
### What changes were proposed in this pull request?

As we decided to release TimestampNTZ type in Spark 3.3, we should update the versions of TimestampNTZ related changes as 3.3.0.

### Why are the changes needed?

Correct the versions in documentation/code comment.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing UT

Closes #33480 from gengliangwang/updateVersion3.2.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-22 18:21:28 +03:00
Kousuke Saruta 3ee9a0db3a [SPARK-35815][SQL] Allow delayThreshold for watermark to be represented as ANSI interval literals
### What changes were proposed in this pull request?

This PR extends the way to represent `delayThreshold` with ANSI interval literals for watermark.

### Why are the changes needed?

A `delayThreshold` is semantically an interval value so it's should be represented as ANSI interval literals as well as the conventional `1 second` form.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

Closes #33456 from sarutak/delayThreshold-interval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 07fa38e2c1)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-22 17:36:52 +03:00
Angerszhuuuu 4a6f7d6c82 [SPARK-36156][SQL] SCRIPT TRANSFORM ROW FORMAT DELIMITED should respect NULL DEFINED AS and default value should be \N
### What changes were proposed in this pull request?
SCRIPT TRANSFORM ROW FORMAT DELIMITED should respect `NULL DEFINED AS` and default value should be `\N`
![image](https://user-images.githubusercontent.com/46485123/125775377-611d4f06-f9e5-453a-990d-5a0018774f43.png)
![image](https://user-images.githubusercontent.com/46485123/125775387-6618bd0c-78d8-4457-bcc2-12dd70522946.png)

### Why are the changes needed?
Keep consistence with Hive

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #33363 from AngersZhuuuu/SPARK-36156.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit bb09bd2e2d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-22 17:28:52 +08:00
allisonwang-db 31bb9e04ad [SPARK-36063][SQL] Optimize OneRowRelation subqueries
### What changes were proposed in this pull request?
This PR adds optimization for scalar and lateral subqueries with OneRowRelation as leaf nodes. It inlines such subqueries before decorrelation to avoid rewriting them as left outer joins. It also introduces a flag to turn on/off this optimization: `spark.sql.optimizer.optimizeOneRowRelationSubquery` (default: True).

For example:
```sql
select (select c1) from t
```
Analyzed plan:
```
Project [scalar-subquery#17 [c1#18] AS scalarsubquery(c1)#22]
:  +- Project [outer(c1#18)]
:     +- OneRowRelation
+- LocalRelation [c1#18, c2#19]
```

Optimized plan before this PR:
```
Project [c1#18#25 AS scalarsubquery(c1)#22]
+- Join LeftOuter, (c1#24 <=> c1#18)
   :- LocalRelation [c1#18]
   +- Aggregate [c1#18], [c1#18 AS c1#18#25, c1#18 AS c1#24]
      +- LocalRelation [c1#18]
```

Optimized plan after this PR:
```
LocalRelation [scalarsubquery(c1)#22]
```

### Why are the changes needed?
To optimize query plans.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added new unit tests.

Closes #33284 from allisonwang-db/spark-36063-optimize-subquery-one-row-relation.

Authored-by: allisonwang-db <allison.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit de8e4be92c)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-22 10:48:48 +08:00
Kousuke Saruta 468165ae52 [SPARK-36208][SQL][3.2] SparkScriptTransformation should support ANSI interval types
### What changes were proposed in this pull request?

This PR changes `BaseScriptTransformationExec` for `SparkScriptTransformationExec` to support ANSI interval types.

### Why are the changes needed?

`SparkScriptTransformationExec` support `CalendarIntervalType` so it's better to support ANSI interval types as well.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Authored-by: Kousuke Saruta <sarutakoss.nttdata.com>
Signed-off-by: Max Gekk <max.gekkgmail.com>
(cherry picked from commit f56c7b71ff)
Signed-off-by: Max Gekk <max.gekkgmail.com>

Closes #33463 from MaxGekk/sarutak_script-transformation-interval-3.2.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-21 20:54:18 +03:00
Gengliang Wang 99eb3ff226 [SPARK-36227][SQL][3.2] Remove TimestampNTZ type support in Spark 3.2
### What changes were proposed in this pull request?

Remove TimestampNTZ type support in the production code of Spark 3.2.
To archive the goal, this PR adds the check "Utils.isTesting" in the following code branches:
- keyword "timestamp_ntz" and "timestamp_ltz" in parser
- New expressions from https://issues.apache.org/jira/browse/SPARK-35662
- Using java.time.localDateTime as the external type for TimestampNTZType
- `SQLConf.timestampType` which determines the default timestamp type of Spark SQL.

This is to minimize the code difference between the master branch. So that future users won't think TimestampNTZ is already available in Spark 3.2.
The downside is that users can still find TimestampNTZType under package `org.apache.spark.sql.types`. There should be nothing left other than this.
### Why are the changes needed?

As of now, there are some blockers for delivering the TimestampNTZ project in Spark 3.2:

- In the Hive Thrift server, both TimestampType and TimestampNTZType are mapped to the same timestamp type, which can cause confusion for users.
- For the Parquet data source, the new written TimestampNTZType Parquet columns will be read as TimestampType in old Spark releases. Also, we need to decide the merge schema for files mixed with TimestampType and TimestampNTZ type.
- The type coercion rules for TimestampNTZType are incomplete. For example, what should the data type of the in clause "IN(Timestamp'2020-01-01 00:00:00', TimestampNtz'2020-01-01 00:00:00') be.
- It is tricky to support TimestampNTZType in JSON/CSV data readers. We need to avoid regressions as possible as we can.

There are 10 days left for the expected 3.2 RC date. So, I propose to **release the TimestampNTZ type in Spark 3.3 instead of Spark 3.2**. So that we have enough time to make considerate designs for the issues.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing Unit tests + manual tests from spark-shell to validate the changes are gone.
New functions
```
spark.sql("select to_timestamp_ntz'2021-01-01 00:00:00'").show()
spark.sql("select to_timestamp_ltz'2021-01-01 00:00:00'").show()
spark.sql("select make_timestamp_ntz(1,1,1,1,1,1)").show()
spark.sql("select make_timestamp_ltz(1,1,1,1,1,1)").show()
spark.sql("select localtimestamp()").show()
```
The SQL configuration `spark.sql.timestampType` should not work in 3.2
```
spark.conf.set("spark.sql.timestampType", "TIMESTAMP_NTZ")
spark.sql("select make_timestamp(1,1,1,1,1,1)").schema
spark.sql("select to_timestamp('2021-01-01 00:00:00')").schema
spark.sql("select timestamp'2021-01-01 00:00:00'").schema
Seq((1, java.sql.Timestamp.valueOf("2021-01-01 00:00:00"))).toDF("i", "ts").write.partitionBy("ts").parquet("/tmp/test")
spark.read.parquet("/tmp/test").schema
```
LocalDateTime is not supported as a built-in external type:
```
Seq(LocalDateTime.now()).toDF()
org.apache.spark.sql.catalyst.expressions.Literal(java.time.LocalDateTime.now())
org.apache.spark.sql.catalyst.expressions.Literal(0L, TimestampNTZType)
```

Closes #33444 from gengliangwang/banNTZ.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-07-21 09:55:09 -07:00
Kent Yao 7d363733ac [SPARK-36213][SQL] Normalize PartitionSpec for Describe Table Command with PartitionSpec
### What changes were proposed in this pull request?

This fixes a case sensitivity issue for desc table commands with partition specified.

### Why are the changes needed?

bugfix

### Does this PR introduce _any_ user-facing change?

yes, but it's a bugfix

### How was this patch tested?

new tests

#### before
```
+-- !query
+DESC EXTENDED t PARTITION (C='Us', D=1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Partition spec is invalid. The spec (C, D) must match the partition spec (c, d) defined in table '`default`.`t`'
+
```

#### after

https://github.com/apache/spark/pull/33424/files#diff-554189c49950974a948f99fa9b7436f615052511660c6a0ae3062fa8ca0a327cR328

Closes #33424 from yaooqinn/SPARK-36213.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit 4cd6cfc773)
Signed-off-by: Kent Yao <yao@apache.org>
2021-07-22 00:53:12 +08:00
Shardul Mahadik 1ce678b2aa [SPARK-28266][SQL] convertToLogicalRelation should not interpret path property when reading Hive tables
### What changes were proposed in this pull request?

For non-datasource Hive tables, e.g. tables written outside of Spark (through Hive or Trino), we have certain optimzations in Spark where we use Spark ORC and Parquet datasources to read these tables ([Ref](fbf53dee37/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala (L128))) rather than using the Hive serde.
If such a table contains a `path` property, Spark will try to list this path property in addition to the table location when creating an `InMemoryFileIndex`. ([Ref](fbf53dee37/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala (L575))) This can lead to wrong data if `path` property points to a directory location or an error if `path` is not a location. A concrete example is provided in [SPARK-28266 (comment)](https://issues.apache.org/jira/browse/SPARK-28266?focusedCommentId=17380170&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17380170).

Since these tables were not written through Spark, Spark should not interpret this `path` property as it can be set by an external system with a different meaning.

### Why are the changes needed?

For better compatibility with Hive tables generated by other platforms (non-Spark)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit test

Closes #33328 from shardulm94/spark-28266.

Authored-by: Shardul Mahadik <smahadik@linkedin.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 685c3fd05b)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-21 22:40:59 +08:00
Wenchen Fan f4291e373e [SPARK-36228][SQL] Skip splitting a skewed partition when some map outputs are removed
### What changes were proposed in this pull request?

Sometimes, AQE skew join optimization can fail with NPE. This is because AQE tries to get the shuffle block sizes, but some map outputs are missing due to the executor lost or something.

This PR fixes this bug by skipping skew join handling if some map outputs are missing in the `MapOutputTracker`.

### Why are the changes needed?

bug fix

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

a new UT

Closes #33445 from cloud-fan/bug.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 9c8a3d3975)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-21 22:18:14 +08:00
Wenchen Fan b5c0f6c774 [SPARK-36020][SQL][FOLLOWUP] RemoveRedundantProjects should retain the LOGICAL_PLAN_TAG tag
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/33222 .

https://github.com/apache/spark/pull/33222 made a mistake that, `RemoveRedundantProjects` may lose the `LOGICAL_PLAN_TAG` tag, even though the logical plan link is retained. This was actually caught by the test `LogicalPlanTagInSparkPlanSuite`, but was not being taken care of.

There is no problem so far, but losing information can always lead to potential bugs.

### Why are the changes needed?

fix a mistake

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing test

Closes #33442 from cloud-fan/minor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 94aece4325)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-21 14:03:22 +08:00
Rahul Mahadev 0d60cb51c0 [SPARK-36132][SS][SQL] Support initial state for batch mode of flatMapGroupsWithState
### What changes were proposed in this pull request?
Adding support for accepting an initial state with flatMapGroupsWithState in batch mode.

### Why are the changes needed?
SPARK-35897  added support for accepting an initial state for streaming queries using flatMapGroupsWithState. the code flow is separate for batch and streaming and required a different PR.

### Does this PR introduce _any_ user-facing change?

Yes as discussed above flatMapGroupsWithState in batch mode can accept an initialState, previously this would throw an UnsupportedOperationException

### How was this patch tested?

Added relevant unit tests in FlatMapGroupsWithStateSuite and modified the  tests `JavaDatasetSuite`

Closes #33336 from rahulsmahadev/flatMapGroupsWithStateBatch.

Authored-by: Rahul Mahadev <rahul.mahadev@databricks.com>
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
(cherry picked from commit efcce23b91)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
2021-07-21 01:51:01 -04:00
Liang-Chi Hsieh 0b14ab12a2 [SPARK-36030][SQL][FOLLOW-UP][3.2] Remove duplicated test suiteRemove duplicated test suite
### What changes were proposed in this pull request?

Removes `FileFormatDataWriterMetricSuite` which duplicated.

### Why are the changes needed?

`FileFormatDataWriterMetricSuite` should be renamed to `InMemoryTableMetricSuite`. But it was wrongly copied.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #33454 from viirya/SPARK-36030-followup-3.2.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-20 22:29:57 -07:00
Hyukjin Kwon 6041d1c51b [SPARK-36030][SQL][FOLLOW-UP] Avoid procedure syntax deprecated in Scala 2.13
### What changes were proposed in this pull request?

This PR avoid using procedure syntax deprecated in Scala 2.13.

https://github.com/apache/spark/runs/3120481756?check_suite_focus=true

```
[error] /home/runner/work/spark/spark/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriterMetricSuite.scala:44:90: procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `testMetricOnDSv2`'s return type
[error]   private def testMetricOnDSv2(func: String => Unit, checker: Map[Long, String] => Unit) {
[error]                                                                                          ^
[error] /home/runner/work/spark/spark/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala:44:90: procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `testMetricOnDSv2`'s return type
[error]   private def testMetricOnDSv2(func: String => Unit, checker: Map[Long, String] => Unit) {
[error]                                                                                          ^
[warn] 100 warnings found
[error] two errors found
[error] (sql / Test / compileIncremental) Compilation failed
[error] Total time: 579 s (09:39), completed Jul 21, 2021 4:14:26 AM
```

### Why are the changes needed?

To make the build compatible with Scala 2.13 in Spark.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

Manually tested:

```bash
./dev/change-scala-version.sh 2.13
./build/mvn -DskipTests -Phive-2.3 -Phive clean package -Pscala-2.13
```

Closes #33452 from HyukjinKwon/SPARK-36030.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 99006e515b)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-21 14:09:35 +09:00
Liang-Chi Hsieh 86d1fb4698 [SPARK-36030][SQL] Support DS v2 metrics at writing path
### What changes were proposed in this pull request?

We add the interface for DS v2 metrics in SPARK-34366. It is only added for reading path, though. This patch extends the metrics interface to writing path.

### Why are the changes needed?

Complete DS v2 metrics interface support in writing path.

### Does this PR introduce _any_ user-facing change?

No. For developer, yes, as this adds metrics support at DS v2 writing path.

### How was this patch tested?

Added test.

Closes #33239 from viirya/v2-write-metrics.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 2653201b0a)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-20 20:20:48 -07:00
gengjiaan 9a7c59c99c [SPARK-36222][SQL] Step by days in the Sequence expression for dates
### What changes were proposed in this pull request?
The current implement of `Sequence` expression not support step by days for dates.
```
spark-sql> select sequence(date'2021-07-01', date'2021-07-10', interval '3' day);
Error in query: cannot resolve 'sequence(DATE '2021-07-01', DATE '2021-07-10', INTERVAL '3' DAY)' due to data type mismatch:
sequence uses the wrong parameter type. The parameter type must conform to:
1. The start and stop expressions must resolve to the same type.
2. If start and stop expressions resolve to the 'date' or 'timestamp' type
then the step expression must resolve to the 'interval' or
'interval year to month' or 'interval day to second' type,
otherwise to the same type as the start and stop expressions.
         ; line 1 pos 7;
'Project [unresolvedalias(sequence(2021-07-01, 2021-07-10, Some(INTERVAL '3' DAY), Some(Europe/Moscow)), None)]
+- OneRowRelation
```

### Why are the changes needed?
`DayTimeInterval` has day granularity should as step for dates.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Sequence expression will supports step by `DayTimeInterval` has day granularity for dates.

### How was this patch tested?
New tests.

Closes #33439 from beliefer/SPARK-36222.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit c0d84e6cf1)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-20 19:17:09 +03:00
Koert Kuipers a864388b5a [SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns
### What changes were proposed in this pull request?
Preserve the insertion order of columns in Dataset.withColumns

### Why are the changes needed?
It is the expected behavior. We preserve insertion order in all other places.

### Does this PR introduce _any_ user-facing change?
No. Currently Dataset.withColumns is not actually used anywhere to insert more than one column. This change is to make sure it behaves as expected when it is used for that purpose in future.

### How was this patch tested?
Added test in DatasetSuite

Closes #33423 from koertkuipers/feat-withcolumns-preserve-order.

Authored-by: Koert Kuipers <koert@tresata.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit bf680bf25a)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-20 09:09:34 -07:00
Karen Feng f55f8820fc [SPARK-36079][SQL] Null-based filter estimate should always be in the range [0, 1]
### What changes were proposed in this pull request?

Forces the selectivity estimate for null-based filters to be in the range `[0,1]`.

### Why are the changes needed?

I noticed in a few TPC-DS query tests that the column statistic null count can be higher than the table statistic row count. In the current implementation, the selectivity estimate for `IsNotNull` is negative.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes #33286 from karenfeng/bound-selectivity-est.

Authored-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit ddc61e62b9)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-20 21:32:30 +08:00
gengjiaan 0f6cf8abe3 [SPARK-36046][SQL][FOLLOWUP] Implement prettyName for MakeTimestampNTZ and MakeTimestampLTZ
### What changes were proposed in this pull request?
This PR follows https://github.com/apache/spark/pull/33299 and implement `prettyName` for `MakeTimestampNTZ` and `MakeTimestampLTZ` based on the discussion show below
https://github.com/apache/spark/pull/33299/files#r668423810

### Why are the changes needed?
This PR fix the incorrect alias usecase.

### Does this PR introduce _any_ user-facing change?
'No'.
Modifications are transparent to users.

### How was this patch tested?
Jenkins test.

Closes #33430 from beliefer/SPARK-36046-followup.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 033a5731b4)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-07-20 21:31:34 +08:00
Angerszhuuuu 7cd89efca5 [SPARK-36201][SQL][FOLLOWUP] Schema check should check inner field too
### What changes were proposed in this pull request?
When inner field have wrong schema filed name should check field name too.
![image](https://user-images.githubusercontent.com/46485123/126101009-c192d87f-1e18-4355-ad53-1419dacdeb76.png)

### Why are the changes needed?
Early check early faield

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #33409 from AngersZhuuuu/SPARK-36201.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 251885772d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-20 21:08:36 +08:00
ulysses-you 677104f495 [SPARK-36221][SQL] Make sure CustomShuffleReaderExec has at least one partition
### What changes were proposed in this pull request?

* Add non-empty partition check in `CustomShuffleReaderExec`
* Make sure `OptimizeLocalShuffleReader` doesn't return empty partition

### Why are the changes needed?

Since SPARK-32083, AQE coalesce always return at least one partition, it should be robust to add non-empty check in `CustomShuffleReaderExec`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

not need

Closes #33431 from ulysses-you/non-empty-partition.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit b70c25881c)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-20 20:48:51 +08:00
Kent Yao 782dc9a795 [SPARK-36179][SQL] Support TimestampNTZType in SparkGetColumnsOperation
### What changes were proposed in this pull request?

Support TimestampNTZType in SparkGetColumnsOperation

### Why are the changes needed?

TimestampNTZType coverage

### Does this PR introduce _any_ user-facing change?

yes, jdbc end-users will be aware of TimestampNTZType

### How was this patch tested?

add new test

Closes #33393 from yaooqinn/SPARK-36179.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 0c76fb9c01)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-20 09:49:25 +09:00
gengjiaan ab4c160880 [SPARK-36091][SQL] Support TimestampNTZ type in expression TimeWindow
### What changes were proposed in this pull request?
The current implement of `TimeWindow` only supports `TimestampType`. Spark added a new type `TimestampNTZType`, so we should support `TimestampNTZType` in expression `TimeWindow`.

### Why are the changes needed?
 `TimestampNTZType` similar to `TimestampType`, we should support `TimestampNTZType` in expression `TimeWindow`.

### Does this PR introduce _any_ user-facing change?
'Yes'.
`TimeWindow` will accepts `TimestampNTZType`.

### How was this patch tested?
New tests.

Closes #33341 from beliefer/SPARK-36091.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 7aa01798c5)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-07-19 19:24:01 +08:00
Angerszhuuuu 84a6fa22b3 [SPARK-36093][SQL] RemoveRedundantAliases should not change Command's parameter's expression's name
### What changes were proposed in this pull request?
RemoveRedundantAliases may change DataWritingCommand's parameter's attribute name.
In the UT's case before RemoveRedundantAliases the partitionColumns is `CAL_DT`, and change by RemoveRedundantAliases and change to `cal_dt` then case the error case

### Why are the changes needed?
Fix bug

### Does this PR introduce _any_ user-facing change?
For below SQL case
```
sql("create table t1(cal_dt date) using parquet")
sql("insert into t1 values (date'2021-06-27'),(date'2021-06-28'),(date'2021-06-29'),(date'2021-06-30')")
sql("create view t1_v as select * from t1")
sql("CREATE TABLE t2 USING PARQUET PARTITIONED BY (CAL_DT) AS SELECT 1 AS FLAG,CAL_DT FROM t1_v WHERE CAL_DT BETWEEN '2021-06-27' AND '2021-06-28'")
sql("INSERT INTO t2 SELECT 2 AS FLAG,CAL_DT FROM t1_v WHERE CAL_DT BETWEEN '2021-06-29' AND '2021-06-30'")
```

Before this pr
```
sql("SELECT * FROM t2 WHERE CAL_DT BETWEEN '2021-06-29' AND '2021-06-30'").show
+----+------+
|FLAG|CAL_DT|
+----+------+
+----+------+
sql("SELECT * FROM t2 ").show
+----+----------+
|FLAG|    CAL_DT|
+----+----------+
|   1|2021-06-27|
|   1|2021-06-28|
+----+----------+
```

After this pr
```
sql("SELECT * FROM t2 WHERE CAL_DT BETWEEN '2021-06-29' AND '2021-06-30'").show
+----+------+
|FLAG|CAL_DT|
+----+------+
|   2|2021-06-29|
|   2|2021-06-30|
+----+------+
sql("SELECT * FROM t2 ").show
+----+----------+
|FLAG|    CAL_DT|
+----+----------+
|   1|2021-06-27|
|   1|2021-06-28|
|   2|2021-06-29|
|   2|2021-06-30|
+----+----------+
```

### How was this patch tested?
Added UT

Closes #33324 from AngersZhuuuu/SPARK-36093.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 313f3c5460)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-19 16:22:47 +08:00
Kent Yao d0c4d224e0 [SPARK-36197][SQL] Use PartitionDesc instead of TableDesc for reading hive partitioned tables
### What changes were proposed in this pull request?

A hive partition can have different `PartitionDesc`s from `TableDesc` for describing Serde/InputFormatClass/OutputFormatClass, for a hive partitioned table, we shall respect those in `PartitionDesc`.

### Why are the changes needed?

in many cases, that Spark reads hive tables could result in surprise because of this issue.

### Does this PR introduce _any_ user-facing change?

yes, hive partition table that contains different serde/input/output could be recognized by Spark

### How was this patch tested?

new test added

Closes #33406 from yaooqinn/SPARK-36197.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit ef80356614)
Signed-off-by: Kent Yao <yao@apache.org>
2021-07-19 16:00:12 +08:00
Wenchen Fan 5b98ec2527 [SPARK-36184][SQL] Use ValidateRequirements instead of EnsureRequirements to skip AQE rules that adds extra shuffles
### What changes were proposed in this pull request?

Currently, two AQE rules `OptimizeLocalShuffleReader` and `OptimizeSkewedJoin` run `EnsureRequirements` at the end to check if there are extra shuffles in the optimized plan and revert the optimization if extra shuffles are introduced.

This PR proposes to run `ValidateRequirements` instead, which is much simpler than `EnsureRequirements`. This PR also moves this check to `AdaptiveSparkPlanExec`, so that it's centralized instead of in each rule. After centralization, the batch name of optimizing the final stage is the same as normal stages, which makes more sense.

### Why are the changes needed?

`EnsureRequirements` is a big rule and even contains optimizations (remove unnecessary shuffles). `ValidateRequirements` is much faster to run and can avoid potential bugs as it has no optimization and is a pure check.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests.

Closes #33396 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 8396a70ddc)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-19 14:14:58 +08:00
gengjiaan 85f70a1181 [SPARK-36090][SQL] Support TimestampNTZType in expression Sequence
### What changes were proposed in this pull request?
The current implement of `Sequence` accept `TimestampType`, `DateType` and `IntegralType`. This PR will let `Sequence` accepts `TimestampNTZType`.

### Why are the changes needed?
We can generate sequence for timestamp without time zone.

### Does this PR introduce _any_ user-facing change?
'Yes'.
This PR will let `Sequence` accepts `TimestampNTZType`.

### How was this patch tested?
New tests.

Closes #33360 from beliefer/SPARK-36090.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 42275bb20d)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-18 20:46:37 +03:00
Kousuke Saruta f7ed6fc6c6 [SPARK-36170][SQL] Change quoted interval literal (interval constructor) to be converted to ANSI interval types
### What changes were proposed in this pull request?

This PR changes the behavior of the quoted interval literals like `SELECT INTERVAL '1 year 2 month'` to be converted to ANSI interval types.

### Why are the changes needed?

The tnit-to-unit interval literals and the unit list interval literals are converted to ANSI interval types but quoted interval literals are still converted to CalendarIntervalType.

```
-- Unit list interval literals
spark-sql> select interval 1 year 2 month;
1-2
-- Quoted interval literals
spark-sql> select interval '1 year 2 month';
1 years 2 months
```

### Does this PR introduce _any_ user-facing change?

Yes but the following sentence in `sql-migration-guide.md` seems to cover this change.
```
  - In Spark 3.2, the unit list interval literals can not mix year-month fields (YEAR and MONTH) and day-time fields (WEEK, DAY, ..., MICROSECOND).
For example, `INTERVAL 1 day 1 hour` is invalid in Spark 3.2. In Spark 3.1 and earlier,
there is no such limitation and the literal returns value of `CalendarIntervalType`.
To restore the behavior before Spark 3.2, you can set `spark.sql.legacy.interval.enabled` to `true`.
```

### How was this patch tested?

Modified existing tests and add new tests.

Closes #33380 from sarutak/fix-interval-constructor.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 71ea25d4f5)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-17 12:23:50 +03:00
Liang-Chi Hsieh 3d423b94a1 [SPARK-35785][SS][FOLLOWUP] Remove ignored test from RocksDBSuite
### What changes were proposed in this pull request?

This patch removes an ignored test from `RocksDBSuite`.

### Why are the changes needed?

The removed test is now ignored. The test itself doesn't look making sense. For example, the condition for capturing exception is never matched. The test runs updates to RocksDB instances at same remote dir with same versions. This doesn't look like a case it will run through in practice.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #33401 from viirya/remove-ignore-test.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 8009f0dd92)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-07-17 02:05:06 -07:00
Chao Sun a7c576ee14 [SPARK-36128][SQL] Apply spark.sql.hive.metastorePartitionPruning for non-Hive tables that uses Hive metastore for partition management
### What changes were proposed in this pull request?

In `CatalogFileIndex.filterPartitions`, check the config `spark.sql.hive.metastorePartitionPruning` and don't pushdown predicates to remote HMS if it is false. Instead, fallback to the `listPartitions` API and do the filtering on the client side.

### Why are the changes needed?

Currently the config `spark.sql.hive.metastorePartitionPruning` is only effective for Hive tables, and for non-Hive tables we'd always use the `listPartitionsByFilter` API from HMS client. On the other hand, by default all data source tables also manage their partitions through HMS, when the config `spark.sql.hive.manageFilesourcePartitions` is turned on. Therefore, it seems reasonable to extend the above config for non-Hive tables as well.

In certain cases the remote HMS service could throw exceptions when using the `listPartitionsByFilter` API, which, on the Spark side, is unrecoverable at the current state. Therefore it would be better to allow users to disable the API by using the above config.

For instance, HMS only allow pushdown date column when direct SQL is used instead of JDO for interacting with the underlying RDBMS, and will throw exception otherwise. Even though the Spark Hive client will attempt to recover itself when the exception happens, it only does so when the config `hive.metastore.try.direct.sql` from remote HMS is `false`. There could be cases where the value of `hive.metastore.try.direct.sql` is true but remote HMS still throws exception.

### Does this PR introduce _any_ user-facing change?

Yes now the config `spark.sql.hive.metastorePartitionPruning` is extended for non-Hive tables which use HMS to manage their partition metadata.

### How was this patch tested?

Added a new unit test:
```
build/sbt "hive/testOnly *PruneFileSourcePartitionsSuite -- -z SPARK-36128"
```

Closes #33348 from sunchao/SPARK-36128-by-filter.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 37dc3f9ea7)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-16 13:32:45 -07:00
Jungtaek Lim 4bfcdf38cf [SPARK-34893][SS] Support session window natively
Introduction: this PR is the last part of SPARK-10816 (EventTime based sessionization (session window)). Please refer #31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.)

### What changes were proposed in this pull request?

This PR proposes to support native session window. Please refer the comments/design doc in SPARK-10816 for more details on the rationalization and design (could be outdated a bit compared to the PR).

The definition of the boundary of "session window" is [the timestamp of start event ~ the timestamp of last event + gap duration). That said, unlike time window, session window is a dynamic window which can expand if new input row is added to the session. To handle expansion of session window, Spark defines session window per input row, and "merge" windows if they can be merged (boundaries are overlapped).

This PR leverages two different approaches on merging session windows:

1. merging session windows with Spark's aggregation logic (a variant of sort aggregation)
2. updating session window for all rows bound to the same session, and applying aggregation logic afterwards

First one is preferable as it outperforms compared to the second one, though it can be only used if merging session window can be applied altogether with aggregation. It is not applicable on all the cases, so second one is used to cover the remaining cases.

This PR also applies the optimization on merging input rows and existing sessions with retaining the order (group keys + start timestamp of session window), leveraging the fact the number of existing sessions per group key won't be huge.

The state format is versioned, so that we can bring a new state format if we find a better one.

### Why are the changes needed?

For now, to deal with sessionization, Spark requires end users to play with (flat)MapGroupsWithState directly which has a couple of major drawbacks:

1. (flat)MapGroupsWithState is lower level API and end users have to code everything in details for defining session window and merging windows
2. built-in aggregate functions cannot be used and end users have to deal with aggregation by themselves
3. (flat)MapGroupsWithState is only available in Scala/Java.

With native support of session window, end users simply use "session_window" like they use "window" for tumbling/sliding window, and leverage built-in aggregate functions as well as UDAFs to simply define aggregations.

Quoting the query example from test suite:

```
    val inputData = MemoryStream[(String, Long)]

    // Split the lines into words, treat words as sessionId of events
    val events = inputData.toDF()
      .select($"_1".as("value"), $"_2".as("timestamp"))
      .withColumn("eventTime", $"timestamp".cast("timestamp"))
      .selectExpr("explode(split(value, ' ')) AS sessionId", "eventTime")
      .withWatermark("eventTime", "30 seconds")

    val sessionUpdates = events
      .groupBy(session_window($"eventTime", "10 seconds") as 'session, 'sessionId)
      .agg(count("*").as("numEvents"))
      .selectExpr("sessionId", "CAST(session.start AS LONG)", "CAST(session.end AS LONG)",
        "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS durationMs",
        "numEvents")
```

which is same as StructuredSessionization (native session window is shorter and clearer even ignoring model classes).

39542bb81f/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala (L66-L105)

(Worth noting that the code in StructuredSessionization only works with processing time. The code doesn't consider old event can update the start time of old session.)

### Does this PR introduce _any_ user-facing change?

Yes. This PR brings the new feature to support session window on both batch and streaming query, which adds a new function "session_window" which usage is similar with "window".

### How was this patch tested?

New test suites. Also tested with benchmark code.

Closes #33081 from HeartSaVioR/SPARK-34893-SPARK-10816-PR-31570-part-5.

Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit f2bf8b051b)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-07-16 20:38:35 +09:00
Ke Jia de3b8b996f [SPARK-35710][SQL] Support DPP + AQE when there is no reused broadcast exchange
### What changes were proposed in this pull request?
This PR add the DPP + AQE support when spark can't reuse the broadcast but executing the DPP subquery is cheaper.

### Why are the changes needed?
Improve AQE + DPP

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Adding new ut

Closes #32861 from JkSelf/supportDPP3.

Lead-authored-by: Ke Jia <ke.a.jia@intel.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit c1b3f86c58)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-16 16:01:23 +08:00
Steven Aerts 109247f02e [SPARK-35985][SQL] push partitionFilters for empty readDataSchema
this commit makes sure that for File Source V2 partition filters are
also taken into account when the readDataSchema is empty.
This is the case for queries like:

    SELECT count(*) FROM tbl WHERE partition=foo
    SELECT input_file_name() FROM tbl WHERE partition=foo

### What changes were proposed in this pull request?

As described in SPARK-35985 there is bug in the File Datasource V2 which prevents it to push down to the FileScanner for queries like the ones listed above.

### Why are the changes needed?

If partitions filters are not pushed down, the whole dataset will be scanned while only one partition is interesting.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

An extra test was added which relies on the output of explain, as is done in other places.

Closes #33191 from steven-aerts/SPARK-35985.

Authored-by: Steven Aerts <steven.aerts@airties.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit f06aa4a3f3)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-16 04:53:23 +00:00
Max Gekk 57849a54da [SPARK-36034][SQL] Rebase datetime in pushed down filters to parquet
### What changes were proposed in this pull request?
In the PR, I propose to propagate either the SQL config `spark.sql.parquet.datetimeRebaseModeInRead` or/and Parquet option `datetimeRebaseMode` to `ParquetFilters`. The `ParquetFilters` class uses the settings in conversions of dates/timestamps instances from datasource filters to values pushed via `FilterApi` to the `parquet-column` lib.

Before the changes, date/timestamp values expressed as days/microseconds/milliseconds are interpreted as offsets in Proleptic Gregorian calendar, and pushed to the parquet library as is. That works fine if timestamp/dates values in parquet files were saved in the `CORRECTED` mode but in the `LEGACY` mode, filter's values could not match to actual values.

After the changes, timestamp/dates values of filters pushed down to parquet libs such as `FilterApi.eq(col1, -719162)` are rebased according the rebase settings. For the example, if the rebase mode is `CORRECTED`, **-719162** is pushed down as is but if the current rebase mode is `LEGACY`, the number of days is rebased to **-719164**. For more context, the PR description https://github.com/apache/spark/pull/28067 shows the diffs between two calendars.

### Why are the changes needed?
The changes fix the bug portrayed by the following example from SPARK-36034:
```scala
In [27]: spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")
>>> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
>>> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show()
+----+
|date|
+----+
+----+
```
The result must have the date value `0001-01-01`.

### Does this PR introduce _any_ user-facing change?
In some sense, yes. Query results can be different in some cases. For the example above:
```scala
scala> spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")
scala> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
scala> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show(false)
+----------+
|date      |
+----------+
|0001-01-01|
+----------+
```

### How was this patch tested?
By running the modified test suite `ParquetFilterSuite`:
```
$ build/sbt "test:testOnly *ParquetV1FilterSuite"
$ build/sbt "test:testOnly *ParquetV2FilterSuite"
```

Closes #33347 from MaxGekk/fix-parquet-ts-filter-pushdown.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit b09b7f7cc0)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-15 22:22:09 +03:00
Gengliang Wang fce19ab31a [SPARK-36135][SQL] Support TimestampNTZ type in file partitioning
### What changes were proposed in this pull request?

Support TimestampNTZ type in file partitioning
* When there is no provided schema and the default Timestamp type is TimestampNTZ , Spark should infer and parse the timestamp value partitions as TimestampNTZ.
* When the provided Partition schema is TimestampNTZ, Spark should be able to parse the TimestampNTZ type partition column.

### Why are the changes needed?

File partitioning is an important feature and Spark should support TimestampNTZ type in it.

### Does this PR introduce _any_ user-facing change?

Yes, Spark supports TimestampNTZ type in file partitioning

### How was this patch tested?

Unit tests

Closes #33344 from gengliangwang/partition.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 96c2919988)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-07-16 01:13:49 +08:00
Yuming Wang 1ed72e2e8e [SPARK-32792][SQL][FOLLOWUP] Fix Parquet filter pushdown NOT IN predicate
### What changes were proposed in this pull request?

This pr fix Parquet filter pushdown `NOT` `IN` predicate if its values exceeds `spark.sql.parquet.pushdown.inFilterThreshold`. For example: `Not(In(a, Array(2, 3, 7))`. We can not push down `not(and(gteq(a, 2), lteq(a, 7)))`.

### Why are the changes needed?

Fix bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #33365 from wangyum/SPARK-32792-3.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 0062c03c15)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-15 18:52:06 +03:00
PengLei 1a3c56d3ea [SPARK-29519][SQL][FOLLOWUP] Keep output is deterministic for show tblproperties
### What changes were proposed in this pull request?
Keep the output order is deterministic for `SHOW TBLPROPERTIES`

### Why are the changes needed?
[#33343](https://github.com/apache/spark/pull/33343#issue-689828187).
Keep the output order deterministic meaningful.

Since the properties are sorted and then compare result in the testcase for `SHOW TBLPROPERTIES`,  it does not fail, but ideally, the output is ordered and deterministic.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existed ut test

Closes #33353 from Peng-Lei/order-ouput-properties.

Authored-by: PengLei <peng.8lei@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit e05441c223)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-15 21:44:27 +08:00
Kousuke Saruta 42710991e2 [SPARK-33898][SQL][FOLLOWUP] Fix the behavior of SHOW CREATE TABLE to output deterministic results
### What changes were proposed in this pull request?

This PR fixes a behavior of `SHOW CREATE TABLE` added in `SPARK-33898` (#32931) to output deterministic result.
A test `SPARK-33898: SHOW CREATE TABLE` in `DataSourceV2SQLSuite` compares two `CREATE TABLE` statements. One is generated by `SHOW CREATE TABLE` against a created table and the other is expected `CREATE TABLE` statement.

The created table has options `from` and `to`, and they are declared in this order.
```
CREATE TABLE $t (
  a bigint NOT NULL,
  b bigint,
  c bigint,
  `extra col` ARRAY<INT>,
  `<another>` STRUCT<x: INT, y: ARRAY<BOOLEAN>>
)
USING foo
OPTIONS (
  from = 0,
  to = 1)
COMMENT 'This is a comment'
TBLPROPERTIES ('prop1' = '1')
PARTITIONED BY (a)
LOCATION '/tmp'
```

And the expected `CREATE TABLE` in the test code is like as follows.
```
"CREATE TABLE testcat.ns1.ns2.tbl (",
"`a` BIGINT NOT NULL,",
"`b` BIGINT,",
"`c` BIGINT,",
"`extra col` ARRAY<INT>,",
"`<another>` STRUCT<`x`: INT, `y`: ARRAY<BOOLEAN>>)",
"USING foo",
"OPTIONS(",
"'from' = '0',",
"'to' = '1')",
"PARTITIONED BY (a)",
"COMMENT 'This is a comment'",
"LOCATION '/tmp'",
"TBLPROPERTIES(",
"'prop1' = '1')"
```
As you can see, the order of `from` and `to` is expected.
But options are implemented as `Map` so the order of key cannot be kept.

In fact, this test fails with Scala 2.13.
```
[info] - SPARK-33898: SHOW CREATE TABLE *** FAILED *** (515 milliseconds)
[info]   Array("CREATE TABLE testcat.ns1.ns2.tbl (", "`a` BIGINT NOT NULL,", "`b` BIGINT,", "`c` BIGINT,", "`extra col` ARRAY<INT>,", "`<another>` STRUCT<`x`: INT, `y`: ARRAY<BOOLEAN>>)", "USING foo", "OPTIONS(", "'to' = '1',", "'from' = '0')", "PARTITIONED BY (a)", "COMMENT 'This is a comment'", "LOCATION '/tmp'", "TBLPROPERTIES(", "'prop1' = '1')") did not equal Array("CREATE TABLE testcat.ns1.ns2.tbl (", "`a` BIGINT NOT NULL,", "`b` BIGINT,", "`c` BIGINT,", "`extra col` ARRAY<INT>,", "`<another>` STRUCT<`x`: INT, `y`: ARRAY<BOOLEAN>>)", "USING foo", "OPTIONS(", "'from' = '0',", "'to' = '1')", "PARTITIONED BY (a)", "COMMENT 'This is a comment'", "LOCATION '/tmp'", "TBLPROPERTIES(", "'prop1' = '1')") (DataSourceV2SQLSuite.scala:1997)
```
In the current master, the test doesn't fail with Scala 2.12 but it's still non-deterministic.

### Why are the changes needed?

Bug fix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I confirmed that the modified test passed with both Scala 2.12 and Scala 2.13 with this change.

Closes #33343 from sarutak/fix-show-create-table-test.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit f95ca31c0f)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-15 20:53:32 +09:00
Gengliang Wang 75ff69a994 [SPARK-36037][TESTS][FOLLOWUP] Avoid wrong test results on daylight saving time
### What changes were proposed in this pull request?

Only use the zone ids that has no daylight saving for testing `localtimestamp`

### Why are the changes needed?

https://github.com/apache/spark/pull/33346#discussion_r670135296 MaxGekk suggests that we should avoid wrong results if possible.

### Does this PR introduce _any_ user-facing change?

No
### How was this patch tested?

Unit test

Closes #33354 from gengliangwang/FIxDST.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 564d3de7c6)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-15 11:41:04 +03:00