Commit graph

11569 commits

Author SHA1 Message Date
Wenchen Fan b5c0f6c774 [SPARK-36020][SQL][FOLLOWUP] RemoveRedundantProjects should retain the LOGICAL_PLAN_TAG tag
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/33222 .

https://github.com/apache/spark/pull/33222 made a mistake that, `RemoveRedundantProjects` may lose the `LOGICAL_PLAN_TAG` tag, even though the logical plan link is retained. This was actually caught by the test `LogicalPlanTagInSparkPlanSuite`, but was not being taken care of.

There is no problem so far, but losing information can always lead to potential bugs.

### Why are the changes needed?

fix a mistake

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing test

Closes #33442 from cloud-fan/minor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 94aece4325)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-21 14:03:22 +08:00
Rahul Mahadev 0d60cb51c0 [SPARK-36132][SS][SQL] Support initial state for batch mode of flatMapGroupsWithState
### What changes were proposed in this pull request?
Adding support for accepting an initial state with flatMapGroupsWithState in batch mode.

### Why are the changes needed?
SPARK-35897  added support for accepting an initial state for streaming queries using flatMapGroupsWithState. the code flow is separate for batch and streaming and required a different PR.

### Does this PR introduce _any_ user-facing change?

Yes as discussed above flatMapGroupsWithState in batch mode can accept an initialState, previously this would throw an UnsupportedOperationException

### How was this patch tested?

Added relevant unit tests in FlatMapGroupsWithStateSuite and modified the  tests `JavaDatasetSuite`

Closes #33336 from rahulsmahadev/flatMapGroupsWithStateBatch.

Authored-by: Rahul Mahadev <rahul.mahadev@databricks.com>
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
(cherry picked from commit efcce23b91)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
2021-07-21 01:51:01 -04:00
Liang-Chi Hsieh 0b14ab12a2 [SPARK-36030][SQL][FOLLOW-UP][3.2] Remove duplicated test suiteRemove duplicated test suite
### What changes were proposed in this pull request?

Removes `FileFormatDataWriterMetricSuite` which duplicated.

### Why are the changes needed?

`FileFormatDataWriterMetricSuite` should be renamed to `InMemoryTableMetricSuite`. But it was wrongly copied.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #33454 from viirya/SPARK-36030-followup-3.2.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-20 22:29:57 -07:00
Hyukjin Kwon 6041d1c51b [SPARK-36030][SQL][FOLLOW-UP] Avoid procedure syntax deprecated in Scala 2.13
### What changes were proposed in this pull request?

This PR avoid using procedure syntax deprecated in Scala 2.13.

https://github.com/apache/spark/runs/3120481756?check_suite_focus=true

```
[error] /home/runner/work/spark/spark/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriterMetricSuite.scala:44:90: procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `testMetricOnDSv2`'s return type
[error]   private def testMetricOnDSv2(func: String => Unit, checker: Map[Long, String] => Unit) {
[error]                                                                                          ^
[error] /home/runner/work/spark/spark/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala:44:90: procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `testMetricOnDSv2`'s return type
[error]   private def testMetricOnDSv2(func: String => Unit, checker: Map[Long, String] => Unit) {
[error]                                                                                          ^
[warn] 100 warnings found
[error] two errors found
[error] (sql / Test / compileIncremental) Compilation failed
[error] Total time: 579 s (09:39), completed Jul 21, 2021 4:14:26 AM
```

### Why are the changes needed?

To make the build compatible with Scala 2.13 in Spark.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

Manually tested:

```bash
./dev/change-scala-version.sh 2.13
./build/mvn -DskipTests -Phive-2.3 -Phive clean package -Pscala-2.13
```

Closes #33452 from HyukjinKwon/SPARK-36030.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 99006e515b)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-21 14:09:35 +09:00
Liang-Chi Hsieh 86d1fb4698 [SPARK-36030][SQL] Support DS v2 metrics at writing path
### What changes were proposed in this pull request?

We add the interface for DS v2 metrics in SPARK-34366. It is only added for reading path, though. This patch extends the metrics interface to writing path.

### Why are the changes needed?

Complete DS v2 metrics interface support in writing path.

### Does this PR introduce _any_ user-facing change?

No. For developer, yes, as this adds metrics support at DS v2 writing path.

### How was this patch tested?

Added test.

Closes #33239 from viirya/v2-write-metrics.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 2653201b0a)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-20 20:20:48 -07:00
gengjiaan 9a7c59c99c [SPARK-36222][SQL] Step by days in the Sequence expression for dates
### What changes were proposed in this pull request?
The current implement of `Sequence` expression not support step by days for dates.
```
spark-sql> select sequence(date'2021-07-01', date'2021-07-10', interval '3' day);
Error in query: cannot resolve 'sequence(DATE '2021-07-01', DATE '2021-07-10', INTERVAL '3' DAY)' due to data type mismatch:
sequence uses the wrong parameter type. The parameter type must conform to:
1. The start and stop expressions must resolve to the same type.
2. If start and stop expressions resolve to the 'date' or 'timestamp' type
then the step expression must resolve to the 'interval' or
'interval year to month' or 'interval day to second' type,
otherwise to the same type as the start and stop expressions.
         ; line 1 pos 7;
'Project [unresolvedalias(sequence(2021-07-01, 2021-07-10, Some(INTERVAL '3' DAY), Some(Europe/Moscow)), None)]
+- OneRowRelation
```

### Why are the changes needed?
`DayTimeInterval` has day granularity should as step for dates.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Sequence expression will supports step by `DayTimeInterval` has day granularity for dates.

### How was this patch tested?
New tests.

Closes #33439 from beliefer/SPARK-36222.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit c0d84e6cf1)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-20 19:17:09 +03:00
Koert Kuipers a864388b5a [SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns
### What changes were proposed in this pull request?
Preserve the insertion order of columns in Dataset.withColumns

### Why are the changes needed?
It is the expected behavior. We preserve insertion order in all other places.

### Does this PR introduce _any_ user-facing change?
No. Currently Dataset.withColumns is not actually used anywhere to insert more than one column. This change is to make sure it behaves as expected when it is used for that purpose in future.

### How was this patch tested?
Added test in DatasetSuite

Closes #33423 from koertkuipers/feat-withcolumns-preserve-order.

Authored-by: Koert Kuipers <koert@tresata.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit bf680bf25a)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-20 09:09:34 -07:00
Karen Feng f55f8820fc [SPARK-36079][SQL] Null-based filter estimate should always be in the range [0, 1]
### What changes were proposed in this pull request?

Forces the selectivity estimate for null-based filters to be in the range `[0,1]`.

### Why are the changes needed?

I noticed in a few TPC-DS query tests that the column statistic null count can be higher than the table statistic row count. In the current implementation, the selectivity estimate for `IsNotNull` is negative.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes #33286 from karenfeng/bound-selectivity-est.

Authored-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit ddc61e62b9)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-20 21:32:30 +08:00
gengjiaan 0f6cf8abe3 [SPARK-36046][SQL][FOLLOWUP] Implement prettyName for MakeTimestampNTZ and MakeTimestampLTZ
### What changes were proposed in this pull request?
This PR follows https://github.com/apache/spark/pull/33299 and implement `prettyName` for `MakeTimestampNTZ` and `MakeTimestampLTZ` based on the discussion show below
https://github.com/apache/spark/pull/33299/files#r668423810

### Why are the changes needed?
This PR fix the incorrect alias usecase.

### Does this PR introduce _any_ user-facing change?
'No'.
Modifications are transparent to users.

### How was this patch tested?
Jenkins test.

Closes #33430 from beliefer/SPARK-36046-followup.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 033a5731b4)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-07-20 21:31:34 +08:00
Angerszhuuuu 7cd89efca5 [SPARK-36201][SQL][FOLLOWUP] Schema check should check inner field too
### What changes were proposed in this pull request?
When inner field have wrong schema filed name should check field name too.
![image](https://user-images.githubusercontent.com/46485123/126101009-c192d87f-1e18-4355-ad53-1419dacdeb76.png)

### Why are the changes needed?
Early check early faield

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #33409 from AngersZhuuuu/SPARK-36201.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 251885772d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-20 21:08:36 +08:00
ulysses-you 677104f495 [SPARK-36221][SQL] Make sure CustomShuffleReaderExec has at least one partition
### What changes were proposed in this pull request?

* Add non-empty partition check in `CustomShuffleReaderExec`
* Make sure `OptimizeLocalShuffleReader` doesn't return empty partition

### Why are the changes needed?

Since SPARK-32083, AQE coalesce always return at least one partition, it should be robust to add non-empty check in `CustomShuffleReaderExec`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

not need

Closes #33431 from ulysses-you/non-empty-partition.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit b70c25881c)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-20 20:48:51 +08:00
Kent Yao 782dc9a795 [SPARK-36179][SQL] Support TimestampNTZType in SparkGetColumnsOperation
### What changes were proposed in this pull request?

Support TimestampNTZType in SparkGetColumnsOperation

### Why are the changes needed?

TimestampNTZType coverage

### Does this PR introduce _any_ user-facing change?

yes, jdbc end-users will be aware of TimestampNTZType

### How was this patch tested?

add new test

Closes #33393 from yaooqinn/SPARK-36179.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 0c76fb9c01)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-20 09:49:25 +09:00
gengjiaan ab4c160880 [SPARK-36091][SQL] Support TimestampNTZ type in expression TimeWindow
### What changes were proposed in this pull request?
The current implement of `TimeWindow` only supports `TimestampType`. Spark added a new type `TimestampNTZType`, so we should support `TimestampNTZType` in expression `TimeWindow`.

### Why are the changes needed?
 `TimestampNTZType` similar to `TimestampType`, we should support `TimestampNTZType` in expression `TimeWindow`.

### Does this PR introduce _any_ user-facing change?
'Yes'.
`TimeWindow` will accepts `TimestampNTZType`.

### How was this patch tested?
New tests.

Closes #33341 from beliefer/SPARK-36091.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 7aa01798c5)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-07-19 19:24:01 +08:00
Angerszhuuuu 84a6fa22b3 [SPARK-36093][SQL] RemoveRedundantAliases should not change Command's parameter's expression's name
### What changes were proposed in this pull request?
RemoveRedundantAliases may change DataWritingCommand's parameter's attribute name.
In the UT's case before RemoveRedundantAliases the partitionColumns is `CAL_DT`, and change by RemoveRedundantAliases and change to `cal_dt` then case the error case

### Why are the changes needed?
Fix bug

### Does this PR introduce _any_ user-facing change?
For below SQL case
```
sql("create table t1(cal_dt date) using parquet")
sql("insert into t1 values (date'2021-06-27'),(date'2021-06-28'),(date'2021-06-29'),(date'2021-06-30')")
sql("create view t1_v as select * from t1")
sql("CREATE TABLE t2 USING PARQUET PARTITIONED BY (CAL_DT) AS SELECT 1 AS FLAG,CAL_DT FROM t1_v WHERE CAL_DT BETWEEN '2021-06-27' AND '2021-06-28'")
sql("INSERT INTO t2 SELECT 2 AS FLAG,CAL_DT FROM t1_v WHERE CAL_DT BETWEEN '2021-06-29' AND '2021-06-30'")
```

Before this pr
```
sql("SELECT * FROM t2 WHERE CAL_DT BETWEEN '2021-06-29' AND '2021-06-30'").show
+----+------+
|FLAG|CAL_DT|
+----+------+
+----+------+
sql("SELECT * FROM t2 ").show
+----+----------+
|FLAG|    CAL_DT|
+----+----------+
|   1|2021-06-27|
|   1|2021-06-28|
+----+----------+
```

After this pr
```
sql("SELECT * FROM t2 WHERE CAL_DT BETWEEN '2021-06-29' AND '2021-06-30'").show
+----+------+
|FLAG|CAL_DT|
+----+------+
|   2|2021-06-29|
|   2|2021-06-30|
+----+------+
sql("SELECT * FROM t2 ").show
+----+----------+
|FLAG|    CAL_DT|
+----+----------+
|   1|2021-06-27|
|   1|2021-06-28|
|   2|2021-06-29|
|   2|2021-06-30|
+----+----------+
```

### How was this patch tested?
Added UT

Closes #33324 from AngersZhuuuu/SPARK-36093.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 313f3c5460)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-19 16:22:47 +08:00
Kent Yao d0c4d224e0 [SPARK-36197][SQL] Use PartitionDesc instead of TableDesc for reading hive partitioned tables
### What changes were proposed in this pull request?

A hive partition can have different `PartitionDesc`s from `TableDesc` for describing Serde/InputFormatClass/OutputFormatClass, for a hive partitioned table, we shall respect those in `PartitionDesc`.

### Why are the changes needed?

in many cases, that Spark reads hive tables could result in surprise because of this issue.

### Does this PR introduce _any_ user-facing change?

yes, hive partition table that contains different serde/input/output could be recognized by Spark

### How was this patch tested?

new test added

Closes #33406 from yaooqinn/SPARK-36197.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit ef80356614)
Signed-off-by: Kent Yao <yao@apache.org>
2021-07-19 16:00:12 +08:00
Wenchen Fan 5b98ec2527 [SPARK-36184][SQL] Use ValidateRequirements instead of EnsureRequirements to skip AQE rules that adds extra shuffles
### What changes were proposed in this pull request?

Currently, two AQE rules `OptimizeLocalShuffleReader` and `OptimizeSkewedJoin` run `EnsureRequirements` at the end to check if there are extra shuffles in the optimized plan and revert the optimization if extra shuffles are introduced.

This PR proposes to run `ValidateRequirements` instead, which is much simpler than `EnsureRequirements`. This PR also moves this check to `AdaptiveSparkPlanExec`, so that it's centralized instead of in each rule. After centralization, the batch name of optimizing the final stage is the same as normal stages, which makes more sense.

### Why are the changes needed?

`EnsureRequirements` is a big rule and even contains optimizations (remove unnecessary shuffles). `ValidateRequirements` is much faster to run and can avoid potential bugs as it has no optimization and is a pure check.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests.

Closes #33396 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 8396a70ddc)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-19 14:14:58 +08:00
gengjiaan 85f70a1181 [SPARK-36090][SQL] Support TimestampNTZType in expression Sequence
### What changes were proposed in this pull request?
The current implement of `Sequence` accept `TimestampType`, `DateType` and `IntegralType`. This PR will let `Sequence` accepts `TimestampNTZType`.

### Why are the changes needed?
We can generate sequence for timestamp without time zone.

### Does this PR introduce _any_ user-facing change?
'Yes'.
This PR will let `Sequence` accepts `TimestampNTZType`.

### How was this patch tested?
New tests.

Closes #33360 from beliefer/SPARK-36090.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 42275bb20d)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-18 20:46:37 +03:00
Kousuke Saruta f7ed6fc6c6 [SPARK-36170][SQL] Change quoted interval literal (interval constructor) to be converted to ANSI interval types
### What changes were proposed in this pull request?

This PR changes the behavior of the quoted interval literals like `SELECT INTERVAL '1 year 2 month'` to be converted to ANSI interval types.

### Why are the changes needed?

The tnit-to-unit interval literals and the unit list interval literals are converted to ANSI interval types but quoted interval literals are still converted to CalendarIntervalType.

```
-- Unit list interval literals
spark-sql> select interval 1 year 2 month;
1-2
-- Quoted interval literals
spark-sql> select interval '1 year 2 month';
1 years 2 months
```

### Does this PR introduce _any_ user-facing change?

Yes but the following sentence in `sql-migration-guide.md` seems to cover this change.
```
  - In Spark 3.2, the unit list interval literals can not mix year-month fields (YEAR and MONTH) and day-time fields (WEEK, DAY, ..., MICROSECOND).
For example, `INTERVAL 1 day 1 hour` is invalid in Spark 3.2. In Spark 3.1 and earlier,
there is no such limitation and the literal returns value of `CalendarIntervalType`.
To restore the behavior before Spark 3.2, you can set `spark.sql.legacy.interval.enabled` to `true`.
```

### How was this patch tested?

Modified existing tests and add new tests.

Closes #33380 from sarutak/fix-interval-constructor.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 71ea25d4f5)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-17 12:23:50 +03:00
Liang-Chi Hsieh 3d423b94a1 [SPARK-35785][SS][FOLLOWUP] Remove ignored test from RocksDBSuite
### What changes were proposed in this pull request?

This patch removes an ignored test from `RocksDBSuite`.

### Why are the changes needed?

The removed test is now ignored. The test itself doesn't look making sense. For example, the condition for capturing exception is never matched. The test runs updates to RocksDB instances at same remote dir with same versions. This doesn't look like a case it will run through in practice.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #33401 from viirya/remove-ignore-test.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 8009f0dd92)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-07-17 02:05:06 -07:00
Chao Sun a7c576ee14 [SPARK-36128][SQL] Apply spark.sql.hive.metastorePartitionPruning for non-Hive tables that uses Hive metastore for partition management
### What changes were proposed in this pull request?

In `CatalogFileIndex.filterPartitions`, check the config `spark.sql.hive.metastorePartitionPruning` and don't pushdown predicates to remote HMS if it is false. Instead, fallback to the `listPartitions` API and do the filtering on the client side.

### Why are the changes needed?

Currently the config `spark.sql.hive.metastorePartitionPruning` is only effective for Hive tables, and for non-Hive tables we'd always use the `listPartitionsByFilter` API from HMS client. On the other hand, by default all data source tables also manage their partitions through HMS, when the config `spark.sql.hive.manageFilesourcePartitions` is turned on. Therefore, it seems reasonable to extend the above config for non-Hive tables as well.

In certain cases the remote HMS service could throw exceptions when using the `listPartitionsByFilter` API, which, on the Spark side, is unrecoverable at the current state. Therefore it would be better to allow users to disable the API by using the above config.

For instance, HMS only allow pushdown date column when direct SQL is used instead of JDO for interacting with the underlying RDBMS, and will throw exception otherwise. Even though the Spark Hive client will attempt to recover itself when the exception happens, it only does so when the config `hive.metastore.try.direct.sql` from remote HMS is `false`. There could be cases where the value of `hive.metastore.try.direct.sql` is true but remote HMS still throws exception.

### Does this PR introduce _any_ user-facing change?

Yes now the config `spark.sql.hive.metastorePartitionPruning` is extended for non-Hive tables which use HMS to manage their partition metadata.

### How was this patch tested?

Added a new unit test:
```
build/sbt "hive/testOnly *PruneFileSourcePartitionsSuite -- -z SPARK-36128"
```

Closes #33348 from sunchao/SPARK-36128-by-filter.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 37dc3f9ea7)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-16 13:32:45 -07:00
Jungtaek Lim 4bfcdf38cf [SPARK-34893][SS] Support session window natively
Introduction: this PR is the last part of SPARK-10816 (EventTime based sessionization (session window)). Please refer #31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.)

### What changes were proposed in this pull request?

This PR proposes to support native session window. Please refer the comments/design doc in SPARK-10816 for more details on the rationalization and design (could be outdated a bit compared to the PR).

The definition of the boundary of "session window" is [the timestamp of start event ~ the timestamp of last event + gap duration). That said, unlike time window, session window is a dynamic window which can expand if new input row is added to the session. To handle expansion of session window, Spark defines session window per input row, and "merge" windows if they can be merged (boundaries are overlapped).

This PR leverages two different approaches on merging session windows:

1. merging session windows with Spark's aggregation logic (a variant of sort aggregation)
2. updating session window for all rows bound to the same session, and applying aggregation logic afterwards

First one is preferable as it outperforms compared to the second one, though it can be only used if merging session window can be applied altogether with aggregation. It is not applicable on all the cases, so second one is used to cover the remaining cases.

This PR also applies the optimization on merging input rows and existing sessions with retaining the order (group keys + start timestamp of session window), leveraging the fact the number of existing sessions per group key won't be huge.

The state format is versioned, so that we can bring a new state format if we find a better one.

### Why are the changes needed?

For now, to deal with sessionization, Spark requires end users to play with (flat)MapGroupsWithState directly which has a couple of major drawbacks:

1. (flat)MapGroupsWithState is lower level API and end users have to code everything in details for defining session window and merging windows
2. built-in aggregate functions cannot be used and end users have to deal with aggregation by themselves
3. (flat)MapGroupsWithState is only available in Scala/Java.

With native support of session window, end users simply use "session_window" like they use "window" for tumbling/sliding window, and leverage built-in aggregate functions as well as UDAFs to simply define aggregations.

Quoting the query example from test suite:

```
    val inputData = MemoryStream[(String, Long)]

    // Split the lines into words, treat words as sessionId of events
    val events = inputData.toDF()
      .select($"_1".as("value"), $"_2".as("timestamp"))
      .withColumn("eventTime", $"timestamp".cast("timestamp"))
      .selectExpr("explode(split(value, ' ')) AS sessionId", "eventTime")
      .withWatermark("eventTime", "30 seconds")

    val sessionUpdates = events
      .groupBy(session_window($"eventTime", "10 seconds") as 'session, 'sessionId)
      .agg(count("*").as("numEvents"))
      .selectExpr("sessionId", "CAST(session.start AS LONG)", "CAST(session.end AS LONG)",
        "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS durationMs",
        "numEvents")
```

which is same as StructuredSessionization (native session window is shorter and clearer even ignoring model classes).

39542bb81f/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala (L66-L105)

(Worth noting that the code in StructuredSessionization only works with processing time. The code doesn't consider old event can update the start time of old session.)

### Does this PR introduce _any_ user-facing change?

Yes. This PR brings the new feature to support session window on both batch and streaming query, which adds a new function "session_window" which usage is similar with "window".

### How was this patch tested?

New test suites. Also tested with benchmark code.

Closes #33081 from HeartSaVioR/SPARK-34893-SPARK-10816-PR-31570-part-5.

Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit f2bf8b051b)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-07-16 20:38:35 +09:00
Ke Jia de3b8b996f [SPARK-35710][SQL] Support DPP + AQE when there is no reused broadcast exchange
### What changes were proposed in this pull request?
This PR add the DPP + AQE support when spark can't reuse the broadcast but executing the DPP subquery is cheaper.

### Why are the changes needed?
Improve AQE + DPP

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Adding new ut

Closes #32861 from JkSelf/supportDPP3.

Lead-authored-by: Ke Jia <ke.a.jia@intel.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit c1b3f86c58)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-16 16:01:23 +08:00
Steven Aerts 109247f02e [SPARK-35985][SQL] push partitionFilters for empty readDataSchema
this commit makes sure that for File Source V2 partition filters are
also taken into account when the readDataSchema is empty.
This is the case for queries like:

    SELECT count(*) FROM tbl WHERE partition=foo
    SELECT input_file_name() FROM tbl WHERE partition=foo

### What changes were proposed in this pull request?

As described in SPARK-35985 there is bug in the File Datasource V2 which prevents it to push down to the FileScanner for queries like the ones listed above.

### Why are the changes needed?

If partitions filters are not pushed down, the whole dataset will be scanned while only one partition is interesting.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

An extra test was added which relies on the output of explain, as is done in other places.

Closes #33191 from steven-aerts/SPARK-35985.

Authored-by: Steven Aerts <steven.aerts@airties.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit f06aa4a3f3)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-16 04:53:23 +00:00
Max Gekk 57849a54da [SPARK-36034][SQL] Rebase datetime in pushed down filters to parquet
### What changes were proposed in this pull request?
In the PR, I propose to propagate either the SQL config `spark.sql.parquet.datetimeRebaseModeInRead` or/and Parquet option `datetimeRebaseMode` to `ParquetFilters`. The `ParquetFilters` class uses the settings in conversions of dates/timestamps instances from datasource filters to values pushed via `FilterApi` to the `parquet-column` lib.

Before the changes, date/timestamp values expressed as days/microseconds/milliseconds are interpreted as offsets in Proleptic Gregorian calendar, and pushed to the parquet library as is. That works fine if timestamp/dates values in parquet files were saved in the `CORRECTED` mode but in the `LEGACY` mode, filter's values could not match to actual values.

After the changes, timestamp/dates values of filters pushed down to parquet libs such as `FilterApi.eq(col1, -719162)` are rebased according the rebase settings. For the example, if the rebase mode is `CORRECTED`, **-719162** is pushed down as is but if the current rebase mode is `LEGACY`, the number of days is rebased to **-719164**. For more context, the PR description https://github.com/apache/spark/pull/28067 shows the diffs between two calendars.

### Why are the changes needed?
The changes fix the bug portrayed by the following example from SPARK-36034:
```scala
In [27]: spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")
>>> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
>>> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show()
+----+
|date|
+----+
+----+
```
The result must have the date value `0001-01-01`.

### Does this PR introduce _any_ user-facing change?
In some sense, yes. Query results can be different in some cases. For the example above:
```scala
scala> spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")
scala> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
scala> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show(false)
+----------+
|date      |
+----------+
|0001-01-01|
+----------+
```

### How was this patch tested?
By running the modified test suite `ParquetFilterSuite`:
```
$ build/sbt "test:testOnly *ParquetV1FilterSuite"
$ build/sbt "test:testOnly *ParquetV2FilterSuite"
```

Closes #33347 from MaxGekk/fix-parquet-ts-filter-pushdown.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit b09b7f7cc0)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-15 22:22:09 +03:00
Gengliang Wang fce19ab31a [SPARK-36135][SQL] Support TimestampNTZ type in file partitioning
### What changes were proposed in this pull request?

Support TimestampNTZ type in file partitioning
* When there is no provided schema and the default Timestamp type is TimestampNTZ , Spark should infer and parse the timestamp value partitions as TimestampNTZ.
* When the provided Partition schema is TimestampNTZ, Spark should be able to parse the TimestampNTZ type partition column.

### Why are the changes needed?

File partitioning is an important feature and Spark should support TimestampNTZ type in it.

### Does this PR introduce _any_ user-facing change?

Yes, Spark supports TimestampNTZ type in file partitioning

### How was this patch tested?

Unit tests

Closes #33344 from gengliangwang/partition.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 96c2919988)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-07-16 01:13:49 +08:00
Yuming Wang 1ed72e2e8e [SPARK-32792][SQL][FOLLOWUP] Fix Parquet filter pushdown NOT IN predicate
### What changes were proposed in this pull request?

This pr fix Parquet filter pushdown `NOT` `IN` predicate if its values exceeds `spark.sql.parquet.pushdown.inFilterThreshold`. For example: `Not(In(a, Array(2, 3, 7))`. We can not push down `not(and(gteq(a, 2), lteq(a, 7)))`.

### Why are the changes needed?

Fix bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #33365 from wangyum/SPARK-32792-3.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 0062c03c15)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-15 18:52:06 +03:00
PengLei 1a3c56d3ea [SPARK-29519][SQL][FOLLOWUP] Keep output is deterministic for show tblproperties
### What changes were proposed in this pull request?
Keep the output order is deterministic for `SHOW TBLPROPERTIES`

### Why are the changes needed?
[#33343](https://github.com/apache/spark/pull/33343#issue-689828187).
Keep the output order deterministic meaningful.

Since the properties are sorted and then compare result in the testcase for `SHOW TBLPROPERTIES`,  it does not fail, but ideally, the output is ordered and deterministic.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existed ut test

Closes #33353 from Peng-Lei/order-ouput-properties.

Authored-by: PengLei <peng.8lei@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit e05441c223)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-15 21:44:27 +08:00
Kousuke Saruta 42710991e2 [SPARK-33898][SQL][FOLLOWUP] Fix the behavior of SHOW CREATE TABLE to output deterministic results
### What changes were proposed in this pull request?

This PR fixes a behavior of `SHOW CREATE TABLE` added in `SPARK-33898` (#32931) to output deterministic result.
A test `SPARK-33898: SHOW CREATE TABLE` in `DataSourceV2SQLSuite` compares two `CREATE TABLE` statements. One is generated by `SHOW CREATE TABLE` against a created table and the other is expected `CREATE TABLE` statement.

The created table has options `from` and `to`, and they are declared in this order.
```
CREATE TABLE $t (
  a bigint NOT NULL,
  b bigint,
  c bigint,
  `extra col` ARRAY<INT>,
  `<another>` STRUCT<x: INT, y: ARRAY<BOOLEAN>>
)
USING foo
OPTIONS (
  from = 0,
  to = 1)
COMMENT 'This is a comment'
TBLPROPERTIES ('prop1' = '1')
PARTITIONED BY (a)
LOCATION '/tmp'
```

And the expected `CREATE TABLE` in the test code is like as follows.
```
"CREATE TABLE testcat.ns1.ns2.tbl (",
"`a` BIGINT NOT NULL,",
"`b` BIGINT,",
"`c` BIGINT,",
"`extra col` ARRAY<INT>,",
"`<another>` STRUCT<`x`: INT, `y`: ARRAY<BOOLEAN>>)",
"USING foo",
"OPTIONS(",
"'from' = '0',",
"'to' = '1')",
"PARTITIONED BY (a)",
"COMMENT 'This is a comment'",
"LOCATION '/tmp'",
"TBLPROPERTIES(",
"'prop1' = '1')"
```
As you can see, the order of `from` and `to` is expected.
But options are implemented as `Map` so the order of key cannot be kept.

In fact, this test fails with Scala 2.13.
```
[info] - SPARK-33898: SHOW CREATE TABLE *** FAILED *** (515 milliseconds)
[info]   Array("CREATE TABLE testcat.ns1.ns2.tbl (", "`a` BIGINT NOT NULL,", "`b` BIGINT,", "`c` BIGINT,", "`extra col` ARRAY<INT>,", "`<another>` STRUCT<`x`: INT, `y`: ARRAY<BOOLEAN>>)", "USING foo", "OPTIONS(", "'to' = '1',", "'from' = '0')", "PARTITIONED BY (a)", "COMMENT 'This is a comment'", "LOCATION '/tmp'", "TBLPROPERTIES(", "'prop1' = '1')") did not equal Array("CREATE TABLE testcat.ns1.ns2.tbl (", "`a` BIGINT NOT NULL,", "`b` BIGINT,", "`c` BIGINT,", "`extra col` ARRAY<INT>,", "`<another>` STRUCT<`x`: INT, `y`: ARRAY<BOOLEAN>>)", "USING foo", "OPTIONS(", "'from' = '0',", "'to' = '1')", "PARTITIONED BY (a)", "COMMENT 'This is a comment'", "LOCATION '/tmp'", "TBLPROPERTIES(", "'prop1' = '1')") (DataSourceV2SQLSuite.scala:1997)
```
In the current master, the test doesn't fail with Scala 2.12 but it's still non-deterministic.

### Why are the changes needed?

Bug fix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I confirmed that the modified test passed with both Scala 2.12 and Scala 2.13 with this change.

Closes #33343 from sarutak/fix-show-create-table-test.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit f95ca31c0f)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-15 20:53:32 +09:00
Gengliang Wang 75ff69a994 [SPARK-36037][TESTS][FOLLOWUP] Avoid wrong test results on daylight saving time
### What changes were proposed in this pull request?

Only use the zone ids that has no daylight saving for testing `localtimestamp`

### Why are the changes needed?

https://github.com/apache/spark/pull/33346#discussion_r670135296 MaxGekk suggests that we should avoid wrong results if possible.

### Does this PR introduce _any_ user-facing change?

No
### How was this patch tested?

Unit test

Closes #33354 from gengliangwang/FIxDST.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 564d3de7c6)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-15 11:41:04 +03:00
Gengliang Wang 83b37dfaf0 [SPARK-36037][SQL][FOLLOWUP] Fix flaky test for datetime function localtimestamp
### What changes were proposed in this pull request?

The threshold of the test case "datetime function localtimestamp" is small, which leads to flaky test results
https://github.com/gengliangwang/spark/runs/3067396143?check_suite_focus=true

This PR is to increase the threshold for checking two the different current local datetimes from 5ms to 1 second. (The test case of current_timestamp uses 5 seconds)
### Why are the changes needed?

Fix flaky test
### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes #33346 from gengliangwang/fixFlaky.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 0973397721)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-07-15 11:32:35 +08:00
Karen Feng 8b35bc4d2b [SPARK-36106][SQL][CORE] Label error classes for subset of QueryCompilationErrors
### What changes were proposed in this pull request?

Adds error classes to some of the exceptions in QueryCompilationErrors.

### Why are the changes needed?

Improves auditing for developers and adds useful fields for users (error class and SQLSTATE).

### Does this PR introduce _any_ user-facing change?

Yes, fills in missing error class and SQLSTATE fields.

### How was this patch tested?

Existing tests and new unit tests.

Closes #33309 from karenfeng/group-compilation-errors-1.

Authored-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit e92b8ea6f8)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-15 11:43:32 +09:00
ulysses-you 0da71548a5 [SPARK-35639][SQL][FOLLOWUP] Make hasCoalescedPartition return true if something was actually coalesced
### What changes were proposed in this pull request?

Add `CoalescedPartitionSpec(0, 0, _)` check if a `CoalescedPartitionSpec` is coalesced.

### Why are the changes needed?

Fix corner case.

### Does this PR introduce _any_ user-facing change?

yes, UI may be changed

### How was this patch tested?

Add test

Closes #33342 from ulysses-you/SPARK-35639-FOLLOW.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 3819641201)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-14 22:05:05 +08:00
Chao Sun 4cc7d9b8f1 [SPARK-36123][SQL] Parquet vectorized reader doesn't skip null values correctly
### What changes were proposed in this pull request?

Fix the skipping values logic in Parquet vectorized reader when column index is effective, by considering nulls and only call `ParquetVectorUpdater.skipValues` when the values are non-null.

### Why are the changes needed?

Currently, the Parquet vectorized reader may not work correctly if column index filtering is effective, and the data page contains null values. For instance, let's say we have two columns `c1: BIGINT` and `c2: STRING`, and the following pages:
```
   * c1        500       500       500       500
   *  |---------|---------|---------|---------|
   *  |-------|-----|-----|---|---|---|---|---|
   * c2     400   300   300 200 200 200 200 200
```

and suppose we have a query like the following:
```sql
SELECT * FROM t WHERE c1 = 500
```

this will create a Parquet row range `[500, 1000)` which, when applied to `c2`, will require us to skip all the rows in `[400,500)`. However the current logic for skipping rows is via `updater.skipValues(n, valueReader)` which is incorrect since this skips the next `n` non-null values. In the case when nulls are present, this will not work correctly.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a new test in `ParquetColumnIndexSuite`.

Closes #33330 from sunchao/SPARK-36123-skip-nulls.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit e980c7a840)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-14 18:14:33 +08:00
Linhong Liu c9813f74e9 [SPARK-35780][SQL] Support DATE/TIMESTAMP literals across the full range
### What changes were proposed in this pull request?
DATE/TIMESTAMP literals support years 0000 to 9999. However, internally we support a range that is much larger.
We can add or subtract large intervals from a date/timestamp and the system will happily process and display large negative and positive dates.

Since we obviously cannot put this genie back into the bottle the only thing we can do is allow matching DATE/TIMESTAMP literals.

### Why are the changes needed?
make spark more usable and bug fix

### Does this PR introduce _any_ user-facing change?
Yes, after this PR, below SQL will have different results
```sql
select cast('-10000-1-2' as date) as date_col
-- before PR: NULL
-- after PR: -10000-1-2
```

```sql
select cast('2021-4294967297-11' as date) as date_col
-- before PR: 2021-01-11
-- after PR: NULL
```

### How was this patch tested?
newly added test cases

Closes #32959 from linhongliu-db/SPARK-35780-full-range-datetime.

Lead-authored-by: Linhong Liu <linhong.liu@databricks.com>
Co-authored-by: Linhong Liu <67896261+linhongliu-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit b86645776b)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-14 18:11:53 +08:00
Jungtaek Lim dcee7a65fd [SPARK-34892][SS] Introduce MergingSortWithSessionWindowStateIterator sorting input rows and rows in state efficiently
Introduction: this PR is a part of SPARK-10816 (EventTime based sessionization (session window)). Please refer #31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.)

### What changes were proposed in this pull request?

This PR introduces MergingSortWithSessionWindowStateIterator, which does "merge sort" between input rows and sessions in state based on group key and session's start time.

Note that the iterator does merge sort among input rows and sessions grouped by grouping key. The iterator doesn't provide sessions in state which keys don't exist in input rows. For input rows, the iterator will provide all rows regardless of the existence of matching sessions in state.

MergingSortWithSessionWindowStateIterator works on the precondition that given iterator is sorted by "group keys + start time of session window", and the iterator still retains the characteristic of the sort.

### Why are the changes needed?

This part is a one of required on implementing SPARK-10816.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UT added.

Closes #33077 from HeartSaVioR/SPARK-34892-SPARK-10816-PR-31570-part-4.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 12a576f175)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-07-14 18:48:05 +09:00
Fu Chen 5bc06fd7d9 [SPARK-36130][SQL] UnwrapCastInBinaryComparison should skip In expression when in.list contains an expression that is not literal
### What changes were proposed in this pull request?

Fix [comment](https://github.com/apache/spark/pull/32488#issuecomment-879315179)
This PR fix rule `UnwrapCastInBinaryComparison` bug. Rule UnwrapCastInBinaryComparison should skip In expression when in.list contains an expression that is not literal.

- In

Before this pr, the following example will throw an exception.
```scala
  withTable("tbl") {
    sql("CREATE TABLE tbl (d decimal(33, 27)) USING PARQUET")
    sql("SELECT d FROM tbl WHERE d NOT IN (d + 1)")
  }
```
- InSet

As the analyzer guarantee that all the elements in the `inSet.hset` are literal, so this is not an issue for `InSet`.

fbf53dee37/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala (L264-L279)

### Does this PR introduce _any_ user-facing change?

No, only bug fix.

### How was this patch tested?

New test.

Closes #33335 from cfmcgrady/SPARK-36130.

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 103d16e868)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-14 15:57:23 +08:00
Eugene Koifman 78796349d9 [SPARK-35639][SQL] Make hasCoalescedPartition return true if something was actually coalesced
### What changes were proposed in this pull request?
Fix `CustomShuffleReaderExec.hasCoalescedPartition` so that it returns true only if some original partitions got combined

### Why are the changes needed?
W/o this change `CustomShuffleReaderExec` description can report `coalesced` even though partitions are unchanged

### Does this PR introduce _any_ user-facing change?
Yes, the `Arguments` in the node description is now accurate:
```
(16) CustomShuffleReader
Input [3]: [registration#4, sum#85, count#86L]
Arguments: coalesced
```

### How was this patch tested?
Existing tests

Closes #32872 from ekoifman/PRISM-77023-fix-hasCoalescedPartition.

Authored-by: Eugene Koifman <eugene.koifman@workday.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 4033b2a3f4)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-14 15:48:24 +08:00
gengjiaan 1686cff9a1 [SPARK-36037][SQL] Support ANSI SQL LOCALTIMESTAMP datetime value function
### What changes were proposed in this pull request?
`LOCALTIMESTAMP()` is a datetime value function from ANSI SQL.
The syntax show below:
```
<datetime value function> ::=
    <current date value function>
  | <current time value function>
  | <current timestamp value function>
  | <current local time value function>
  | <current local timestamp value function>
<current date value function> ::=
CURRENT_DATE
<current time value function> ::=
CURRENT_TIME [ <left paren> <time precision> <right paren> ]
<current local time value function> ::=
LOCALTIME [ <left paren> <time precision> <right paren> ]
<current timestamp value function> ::=
CURRENT_TIMESTAMP [ <left paren> <timestamp precision> <right paren> ]
<current local timestamp value function> ::=
LOCALTIMESTAMP [ <left paren> <timestamp precision> <right paren> ]
```

`LOCALTIMESTAMP()` returns the current timestamp at the start of query evaluation as TIMESTAMP WITH OUT TIME ZONE. This is similar to `CURRENT_TIMESTAMP()`.
Note we need to update the optimization rule `ComputeCurrentTime` so that Spark returns the same result in a single query if the function is called multiple times.

### Why are the changes needed?
`CURRENT_TIMESTAMP()` returns the current timestamp at the start of query evaluation.
`LOCALTIMESTAMP()` returns the current timestamp without time zone at the start of query evaluation.
The `LOCALTIMESTAMP` function is an ANSI SQL.
The `LOCALTIMESTAMP` function is very useful.

### Does this PR introduce _any_ user-facing change?
'Yes'. Support new function `LOCALTIMESTAMP()`.

### How was this patch tested?
New tests.

Closes #33258 from beliefer/SPARK-36037.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit b4f7758944)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-14 15:39:02 +08:00
Chao Sun b4355608e0 [SPARK-36131][SQL][TEST] Refactor ParquetColumnIndexSuite
### What changes were proposed in this pull request?

Refactor `ParquetColumnIndexSuite` and allow better code reuse.

### Why are the changes needed?

A few methods in the test suite can share the same utility method `checkUnalignedPages` so it's better to do that and remove code duplication.

Additionally, `parquet.enable.dictionary` is tested for both `true` and `false` combination.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #33334 from sunchao/SPARK-35743-test-refactoring.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 7a7b086534)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-07-13 22:50:07 -07:00
Jungtaek Lim fa8c37acb1 [SPARK-34891][SS] Introduce state store manager for session window in streaming query
Introduction: this PR is a part of SPARK-10816 (`EventTime based sessionization (session window)`). Please refer #31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.)

### What changes were proposed in this pull request?

This PR introduces state store manager for session window in streaming query. Session window in batch query wouldn't need to leverage state store manager.

This PR ensures versioning on state format for state store manager, so that we can apply further optimization after releasing Spark version. StreamingSessionWindowStateManager is a trait defining the available methods in session window state store manager. Its subclasses are classes implementing the trait with versioning.

The format of version 1 leverages the new feature of "prefix match scan" to represent the session windows:

* full key : [ group keys, start time in session window ]
* prefix key [ group keys ]

### Why are the changes needed?

This part is a one of required on implementing SPARK-10816.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test suite added

Closes #31989 from HeartSaVioR/SPARK-34891-SPARK-10816-PR-31570-part-3.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 0fe2d809d6)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-13 08:58:42 -07:00
Gengliang Wang 3ace01b25b [SPARK-36120][SQL] Support TimestampNTZ type in cache table
### What changes were proposed in this pull request?

Support TimestampNTZ type column in SQL command Cache table

### Why are the changes needed?

Cache table should support the new timestamp type.

### Does this PR introduce _any_ user-facing change?

Yes, the TimemstampNTZ type column can used in `CACHE TABLE`

### How was this patch tested?

Unit test

Closes #33322 from gengliangwang/cacheTable.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 067432705f)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-13 17:24:03 +03:00
Wenchen Fan a0f61ccfe4 [SPARK-36033][SQL][TEST] Validate partitioning requirements in TPCDS tests
### What changes were proposed in this pull request?

Make sure all physical plans of TPCDS queries are valid (satisfy the partitioning requirement).

### Why are the changes needed?

improve test coverage

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

Closes #33248 from cloud-fan/aqe2.

Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 583173b7cc)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-13 21:17:25 +08:00
Wenchen Fan 017b7d3f0b [SPARK-36074][SQL] Add error class for StructType.findNestedField
### What changes were proposed in this pull request?

This PR adds an INVALID_FIELD_NAME error class for the errors in `StructType.findNestedField`. It also cleans up the code there and adds UT for this method.

### Why are the changes needed?

follow the new error message framework

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #33282 from cloud-fan/error.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-13 21:15:00 +08:00
Max Gekk b4692949f8 [SPARK-35735][SQL][FOLLOWUP] Remove unused method IntervalUtils.checkIntervalStringDataType()
### What changes were proposed in this pull request?
Remove the private method `checkIntervalStringDataType()` from `IntervalUtils` since it hasn't been used anymore after https://github.com/apache/spark/pull/33242.

### Why are the changes needed?
To improve code maintenance.

### Does this PR introduce _any_ user-facing change?
No. The method is private, and it existing in code base for short time.

### How was this patch tested?
By existing GAs/tests.

Closes #33321 from MaxGekk/SPARK-35735-remove-unused-method.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 1ba3982d16)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-13 15:11:32 +03:00
Kousuke Saruta 1f8e72f9b1 [SPARK-35749][SPARK-35773][SQL] Parse unit list interval literals as tightest year-month/day-time interval types
### What changes were proposed in this pull request?

This PR allow the parser to parse unit list interval literals like `'3' day '10' hours '3' seconds` or `'8' years '3' months` as `YearMonthIntervalType` or `DayTimeIntervalType`.

### Why are the changes needed?

For ANSI compliance.

### Does this PR introduce _any_ user-facing change?

Yes. I noted the following things in the `sql-migration-guide.md`.

* Unit list interval literals are parsed as `YearMonthIntervaType` or `DayTimeIntervalType` instead of `CalendarIntervalType`.
* `WEEK`, `MILLISECONS`, `MICROSECOND` and `NANOSECOND` are not valid units for unit list interval literals.
* Units of year-month and day-time cannot be mixed like `1 YEAR 2 MINUTES`.

### How was this patch tested?

New tests and modified tests.

Closes #32949 from sarutak/day-time-multi-units.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 8e92ef825a)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-13 18:55:22 +08:00
Gengliang Wang 2781bf0721 [SPARK-36119][SQL] Add new SQL function to_timestamp_ltz
### What changes were proposed in this pull request?

Add new SQL function `to_timestamp_ltz`
syntax:
```
to_timestamp_ltz(timestamp_str_column[, fmt])
to_timestamp_ltz(timestamp_column)
to_timestamp_ltz(date_column)
```

### Why are the changes needed?

As the result of to_timestamp become consistent with the SQL configuration spark.sql.timestmapType and there is already a SQL function to_timestmap_ntz, we need new function to_timestamp_ltz to construct timestamp with local time zone values.

### Does this PR introduce _any_ user-facing change?

Yes, a new function for constructing timestamp with local time zone values

### How was this patch tested?

Unit test

Closes #33318 from gengliangwang/to_timestamp_ltz.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 01ddaf3918)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-07-13 17:38:05 +08:00
allisonwang-db 78c4e3710d [SPARK-35551][SQL] Handle the COUNT bug for lateral subqueries
### What changes were proposed in this pull request?
This PR modifies `DecorrelateInnerQuery` to handle the COUNT bug for lateral subqueries. Similar to SPARK-15370, rewriting lateral subqueries as joins can change the semantics of the subquery and lead to incorrect answers.

However we can't reuse the existing code to handle the count bug for correlated scalar subqueries because it assumes the subquery to have a specific shape (either with Filter + Aggregate or Aggregate as the root node). Instead, this PR proposes a more generic way to handle the COUNT bug. If an Aggregate is subject to the COUNT bug, we insert a left outer domain join between the outer query and the aggregate with a `alwaysTrue` marker and rewrite the final result conditioning on the marker. For example:

```sql
-- t1: [(0, 1), (1, 2)]
-- t2: [(0, 2), (0, 3)]
select * from t1 left outer join lateral (select count(*) from t2 where t2.c1 = t1.c1)
```

Without count bug handling, the query plan is
```
Project [c1#44, c2#45, count(1)#53L]
+- Join LeftOuter, (c1#48 = c1#44)
   :- LocalRelation [c1#44, c2#45]
   +- Aggregate [c1#48], [count(1) AS count(1)#53L, c1#48]
      +- LocalRelation [c1#48]
```
and the answer is wrong:
```
+---+---+--------+
|c1 |c2 |count(1)|
+---+---+--------+
|0  |1  |2       |
|1  |2  |null    |
+---+---+--------+
```

With the count bug handling:
```
Project [c1#1, c2#2, count(1)#10L]
+- Join LeftOuter, (c1#34 <=> c1#1)
   :- LocalRelation [c1#1, c2#2]
   +- Project [if (isnull(alwaysTrue#32)) 0 else count(1)#33L AS count(1)#10L, c1#34]
      +- Join LeftOuter, (c1#5 = c1#34)
         :- Aggregate [c1#1], [c1#1 AS c1#34]
         :  +- LocalRelation [c1#1]
         +- Aggregate [c1#5], [count(1) AS count(1)#33L, c1#5, true AS alwaysTrue#32]
            +- LocalRelation [c1#5]
```
and we have the correct answer:
```
+---+---+--------+
|c1 |c2 |count(1)|
+---+---+--------+
|0  |1  |2       |
|1  |2  |0       |
+---+---+--------+
```

### Why are the changes needed?
Fix a correctness bug with lateral join rewrite.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added SQL query tests. The results are consistent with Postgres' results.

Closes #33070 from allisonwang-db/spark-35551-lateral-count-bug.

Authored-by: allisonwang-db <allison.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 4f760f2b1f)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-13 17:35:20 +08:00
Liang-Chi Hsieh 19478bdf52 [SPARK-36109][SS][TEST] Check data after adding data to topic in KafkaSourceStressSuite
### What changes were proposed in this pull request?

This patch proposes to check data after adding data to topic in `KafkaSourceStressSuite`.

### Why are the changes needed?

The test logic in `KafkaSourceStressSuite` is not stable. For example, https://github.com/apache/spark/runs/3049244904.

Once we add data to a topic and then delete the topic before checking data, the expected answer is different to retrieved data from the sink.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #33311 from viirya/stream-assert.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 201566cdd5)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-07-13 01:22:06 -07:00
Wenchen Fan c1d8ccfb64 Revert "[SPARK-35253][SPARK-35398][SQL][BUILD] Bump up the janino version to v3.1.4"
### What changes were proposed in this pull request?

This PR reverts https://github.com/apache/spark/pull/32455 and its followup https://github.com/apache/spark/pull/32536 , because the new janino version has a bug that is not fixed yet: https://github.com/janino-compiler/janino/pull/148

### Why are the changes needed?

avoid regressions

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #33302 from cloud-fan/revert.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit ae6199af44)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-13 12:14:21 +09:00
Gengliang Wang fba3e90863 [SPARK-36046][SQL] Support new functions make_timestamp_ntz and make_timestamp_ltz
### What changes were proposed in this pull request?

Support new functions make_timestamp_ntz and make_timestamp_ltz
Syntax:
* `make_timestamp_ntz(year, month, day, hour, min, sec)`: Create local date-time from year, month, day, hour, min, sec fields
* `make_timestamp_ltz(year, month, day, hour, min, sec[, timezone])`: Create current timestamp with local time zone from year, month, day, hour, min, sec and timezone fields

### Why are the changes needed?

As the result of `make_timestamp` become consistent with the SQL configuration `spark.sql.timestmapType`, we need these two new functions to construct timestamp literals. They align to the functions [`make_timestamp` and `make_timestamptz`](https://www.postgresql.org/docs/9.4/functions-datetime.html) in PostgreSQL

### Does this PR introduce _any_ user-facing change?

Yes, two new datetime functions: make_timestamp_ntz and make_timestamp_ltz.

### How was this patch tested?

End-to-end tests.

Closes #33299 from gengliangwang/make_timestamp_ntz_ltz.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 92bf83ed0a)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-12 22:44:38 +03:00