Commit graph

8255 commits

Author SHA1 Message Date
Jungtaek Lim dcee7a65fd [SPARK-34892][SS] Introduce MergingSortWithSessionWindowStateIterator sorting input rows and rows in state efficiently
Introduction: this PR is a part of SPARK-10816 (EventTime based sessionization (session window)). Please refer #31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.)

### What changes were proposed in this pull request?

This PR introduces MergingSortWithSessionWindowStateIterator, which does "merge sort" between input rows and sessions in state based on group key and session's start time.

Note that the iterator does merge sort among input rows and sessions grouped by grouping key. The iterator doesn't provide sessions in state which keys don't exist in input rows. For input rows, the iterator will provide all rows regardless of the existence of matching sessions in state.

MergingSortWithSessionWindowStateIterator works on the precondition that given iterator is sorted by "group keys + start time of session window", and the iterator still retains the characteristic of the sort.

### Why are the changes needed?

This part is a one of required on implementing SPARK-10816.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UT added.

Closes #33077 from HeartSaVioR/SPARK-34892-SPARK-10816-PR-31570-part-4.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 12a576f175)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-07-14 18:48:05 +09:00
Eugene Koifman 78796349d9 [SPARK-35639][SQL] Make hasCoalescedPartition return true if something was actually coalesced
### What changes were proposed in this pull request?
Fix `CustomShuffleReaderExec.hasCoalescedPartition` so that it returns true only if some original partitions got combined

### Why are the changes needed?
W/o this change `CustomShuffleReaderExec` description can report `coalesced` even though partitions are unchanged

### Does this PR introduce _any_ user-facing change?
Yes, the `Arguments` in the node description is now accurate:
```
(16) CustomShuffleReader
Input [3]: [registration#4, sum#85, count#86L]
Arguments: coalesced
```

### How was this patch tested?
Existing tests

Closes #32872 from ekoifman/PRISM-77023-fix-hasCoalescedPartition.

Authored-by: Eugene Koifman <eugene.koifman@workday.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 4033b2a3f4)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-14 15:48:24 +08:00
gengjiaan 1686cff9a1 [SPARK-36037][SQL] Support ANSI SQL LOCALTIMESTAMP datetime value function
### What changes were proposed in this pull request?
`LOCALTIMESTAMP()` is a datetime value function from ANSI SQL.
The syntax show below:
```
<datetime value function> ::=
    <current date value function>
  | <current time value function>
  | <current timestamp value function>
  | <current local time value function>
  | <current local timestamp value function>
<current date value function> ::=
CURRENT_DATE
<current time value function> ::=
CURRENT_TIME [ <left paren> <time precision> <right paren> ]
<current local time value function> ::=
LOCALTIME [ <left paren> <time precision> <right paren> ]
<current timestamp value function> ::=
CURRENT_TIMESTAMP [ <left paren> <timestamp precision> <right paren> ]
<current local timestamp value function> ::=
LOCALTIMESTAMP [ <left paren> <timestamp precision> <right paren> ]
```

`LOCALTIMESTAMP()` returns the current timestamp at the start of query evaluation as TIMESTAMP WITH OUT TIME ZONE. This is similar to `CURRENT_TIMESTAMP()`.
Note we need to update the optimization rule `ComputeCurrentTime` so that Spark returns the same result in a single query if the function is called multiple times.

### Why are the changes needed?
`CURRENT_TIMESTAMP()` returns the current timestamp at the start of query evaluation.
`LOCALTIMESTAMP()` returns the current timestamp without time zone at the start of query evaluation.
The `LOCALTIMESTAMP` function is an ANSI SQL.
The `LOCALTIMESTAMP` function is very useful.

### Does this PR introduce _any_ user-facing change?
'Yes'. Support new function `LOCALTIMESTAMP()`.

### How was this patch tested?
New tests.

Closes #33258 from beliefer/SPARK-36037.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit b4f7758944)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-14 15:39:02 +08:00
Chao Sun b4355608e0 [SPARK-36131][SQL][TEST] Refactor ParquetColumnIndexSuite
### What changes were proposed in this pull request?

Refactor `ParquetColumnIndexSuite` and allow better code reuse.

### Why are the changes needed?

A few methods in the test suite can share the same utility method `checkUnalignedPages` so it's better to do that and remove code duplication.

Additionally, `parquet.enable.dictionary` is tested for both `true` and `false` combination.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #33334 from sunchao/SPARK-35743-test-refactoring.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 7a7b086534)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-07-13 22:50:07 -07:00
Jungtaek Lim fa8c37acb1 [SPARK-34891][SS] Introduce state store manager for session window in streaming query
Introduction: this PR is a part of SPARK-10816 (`EventTime based sessionization (session window)`). Please refer #31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.)

### What changes were proposed in this pull request?

This PR introduces state store manager for session window in streaming query. Session window in batch query wouldn't need to leverage state store manager.

This PR ensures versioning on state format for state store manager, so that we can apply further optimization after releasing Spark version. StreamingSessionWindowStateManager is a trait defining the available methods in session window state store manager. Its subclasses are classes implementing the trait with versioning.

The format of version 1 leverages the new feature of "prefix match scan" to represent the session windows:

* full key : [ group keys, start time in session window ]
* prefix key [ group keys ]

### Why are the changes needed?

This part is a one of required on implementing SPARK-10816.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test suite added

Closes #31989 from HeartSaVioR/SPARK-34891-SPARK-10816-PR-31570-part-3.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 0fe2d809d6)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-13 08:58:42 -07:00
Gengliang Wang 3ace01b25b [SPARK-36120][SQL] Support TimestampNTZ type in cache table
### What changes were proposed in this pull request?

Support TimestampNTZ type column in SQL command Cache table

### Why are the changes needed?

Cache table should support the new timestamp type.

### Does this PR introduce _any_ user-facing change?

Yes, the TimemstampNTZ type column can used in `CACHE TABLE`

### How was this patch tested?

Unit test

Closes #33322 from gengliangwang/cacheTable.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 067432705f)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-13 17:24:03 +03:00
Wenchen Fan a0f61ccfe4 [SPARK-36033][SQL][TEST] Validate partitioning requirements in TPCDS tests
### What changes were proposed in this pull request?

Make sure all physical plans of TPCDS queries are valid (satisfy the partitioning requirement).

### Why are the changes needed?

improve test coverage

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

Closes #33248 from cloud-fan/aqe2.

Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 583173b7cc)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-13 21:17:25 +08:00
Kousuke Saruta 1f8e72f9b1 [SPARK-35749][SPARK-35773][SQL] Parse unit list interval literals as tightest year-month/day-time interval types
### What changes were proposed in this pull request?

This PR allow the parser to parse unit list interval literals like `'3' day '10' hours '3' seconds` or `'8' years '3' months` as `YearMonthIntervalType` or `DayTimeIntervalType`.

### Why are the changes needed?

For ANSI compliance.

### Does this PR introduce _any_ user-facing change?

Yes. I noted the following things in the `sql-migration-guide.md`.

* Unit list interval literals are parsed as `YearMonthIntervaType` or `DayTimeIntervalType` instead of `CalendarIntervalType`.
* `WEEK`, `MILLISECONS`, `MICROSECOND` and `NANOSECOND` are not valid units for unit list interval literals.
* Units of year-month and day-time cannot be mixed like `1 YEAR 2 MINUTES`.

### How was this patch tested?

New tests and modified tests.

Closes #32949 from sarutak/day-time-multi-units.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 8e92ef825a)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-13 18:55:22 +08:00
Gengliang Wang 2781bf0721 [SPARK-36119][SQL] Add new SQL function to_timestamp_ltz
### What changes were proposed in this pull request?

Add new SQL function `to_timestamp_ltz`
syntax:
```
to_timestamp_ltz(timestamp_str_column[, fmt])
to_timestamp_ltz(timestamp_column)
to_timestamp_ltz(date_column)
```

### Why are the changes needed?

As the result of to_timestamp become consistent with the SQL configuration spark.sql.timestmapType and there is already a SQL function to_timestmap_ntz, we need new function to_timestamp_ltz to construct timestamp with local time zone values.

### Does this PR introduce _any_ user-facing change?

Yes, a new function for constructing timestamp with local time zone values

### How was this patch tested?

Unit test

Closes #33318 from gengliangwang/to_timestamp_ltz.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 01ddaf3918)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-07-13 17:38:05 +08:00
allisonwang-db 78c4e3710d [SPARK-35551][SQL] Handle the COUNT bug for lateral subqueries
### What changes were proposed in this pull request?
This PR modifies `DecorrelateInnerQuery` to handle the COUNT bug for lateral subqueries. Similar to SPARK-15370, rewriting lateral subqueries as joins can change the semantics of the subquery and lead to incorrect answers.

However we can't reuse the existing code to handle the count bug for correlated scalar subqueries because it assumes the subquery to have a specific shape (either with Filter + Aggregate or Aggregate as the root node). Instead, this PR proposes a more generic way to handle the COUNT bug. If an Aggregate is subject to the COUNT bug, we insert a left outer domain join between the outer query and the aggregate with a `alwaysTrue` marker and rewrite the final result conditioning on the marker. For example:

```sql
-- t1: [(0, 1), (1, 2)]
-- t2: [(0, 2), (0, 3)]
select * from t1 left outer join lateral (select count(*) from t2 where t2.c1 = t1.c1)
```

Without count bug handling, the query plan is
```
Project [c1#44, c2#45, count(1)#53L]
+- Join LeftOuter, (c1#48 = c1#44)
   :- LocalRelation [c1#44, c2#45]
   +- Aggregate [c1#48], [count(1) AS count(1)#53L, c1#48]
      +- LocalRelation [c1#48]
```
and the answer is wrong:
```
+---+---+--------+
|c1 |c2 |count(1)|
+---+---+--------+
|0  |1  |2       |
|1  |2  |null    |
+---+---+--------+
```

With the count bug handling:
```
Project [c1#1, c2#2, count(1)#10L]
+- Join LeftOuter, (c1#34 <=> c1#1)
   :- LocalRelation [c1#1, c2#2]
   +- Project [if (isnull(alwaysTrue#32)) 0 else count(1)#33L AS count(1)#10L, c1#34]
      +- Join LeftOuter, (c1#5 = c1#34)
         :- Aggregate [c1#1], [c1#1 AS c1#34]
         :  +- LocalRelation [c1#1]
         +- Aggregate [c1#5], [count(1) AS count(1)#33L, c1#5, true AS alwaysTrue#32]
            +- LocalRelation [c1#5]
```
and we have the correct answer:
```
+---+---+--------+
|c1 |c2 |count(1)|
+---+---+--------+
|0  |1  |2       |
|1  |2  |0       |
+---+---+--------+
```

### Why are the changes needed?
Fix a correctness bug with lateral join rewrite.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added SQL query tests. The results are consistent with Postgres' results.

Closes #33070 from allisonwang-db/spark-35551-lateral-count-bug.

Authored-by: allisonwang-db <allison.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 4f760f2b1f)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-13 17:35:20 +08:00
Liang-Chi Hsieh 19478bdf52 [SPARK-36109][SS][TEST] Check data after adding data to topic in KafkaSourceStressSuite
### What changes were proposed in this pull request?

This patch proposes to check data after adding data to topic in `KafkaSourceStressSuite`.

### Why are the changes needed?

The test logic in `KafkaSourceStressSuite` is not stable. For example, https://github.com/apache/spark/runs/3049244904.

Once we add data to a topic and then delete the topic before checking data, the expected answer is different to retrieved data from the sink.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #33311 from viirya/stream-assert.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 201566cdd5)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-07-13 01:22:06 -07:00
Gengliang Wang fba3e90863 [SPARK-36046][SQL] Support new functions make_timestamp_ntz and make_timestamp_ltz
### What changes were proposed in this pull request?

Support new functions make_timestamp_ntz and make_timestamp_ltz
Syntax:
* `make_timestamp_ntz(year, month, day, hour, min, sec)`: Create local date-time from year, month, day, hour, min, sec fields
* `make_timestamp_ltz(year, month, day, hour, min, sec[, timezone])`: Create current timestamp with local time zone from year, month, day, hour, min, sec and timezone fields

### Why are the changes needed?

As the result of `make_timestamp` become consistent with the SQL configuration `spark.sql.timestmapType`, we need these two new functions to construct timestamp literals. They align to the functions [`make_timestamp` and `make_timestamptz`](https://www.postgresql.org/docs/9.4/functions-datetime.html) in PostgreSQL

### Does this PR introduce _any_ user-facing change?

Yes, two new datetime functions: make_timestamp_ntz and make_timestamp_ltz.

### How was this patch tested?

End-to-end tests.

Closes #33299 from gengliangwang/make_timestamp_ntz_ltz.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 92bf83ed0a)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-12 22:44:38 +03:00
dgd-contributor 12aecb4330 [SPARK-33603][SQL] Grouping exception messages in execution/command
### What changes were proposed in this pull request?
This PR group exception messages in sql/core/src/main/scala/org/apache/spark/sql/execution/command

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce any user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #32951 from dgd-contributor/SPARK-33603_grouping_execution/command.

Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit d03f71657e)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-07-13 01:29:03 +08:00
Jungtaek Lim 07011eb779 [SPARK-35861][SS] Introduce "prefix match scan" feature on state store
### What changes were proposed in this pull request?

This PR proposes to introduce a new feature "prefix match scan" on state store, which enables users of state store (mostly stateful operators) to group the keys into logical groups, and scan the keys in the same group efficiently.

For example, if the schema of the key of state store is `[ sessionId | session.start ]`, we can scan with prefix key which schema is `[ sessionId ]` (leftmost 1 column) and retrieve all key-value pairs in state store which keys are matched with given prefix key.

This PR will bring the API changes, though the changes are done in the developer API.

* Registering the prefix key

We propose to make an explicit change to the init() method of StateStoreProvider, as below:

```
def init(
      stateStoreId: StateStoreId,
      keySchema: StructType,
      valueSchema: StructType,
      numColsPrefixKey: Int,
      storeConfs: StateStoreConf,
      hadoopConf: Configuration): Unit
```

Please note that we remove an unused parameter “keyIndexOrdinal” as well. The parameter is coupled with getRange() which we will remove as well. See below for rationalization.

Here we provide the number of columns we take to project the prefix key from the full key. If the operator doesn’t leverage prefix match scan, the value can (and should) be 0, because the state store provider may optimize the underlying storage format which may bring extra overhead.

We would like to apply some restrictions on prefix key to simplify the functionality:

* Prefix key is a part of the full key. It can’t be the same as the full key.
  * That said, the full key will be the (prefix key + remaining parts), and both prefix key and remaining parts should have at least one column.
* We always take the columns from the leftmost sequentially, like “seq.take(nums)”.
* We don’t allow reordering of the columns.
* We only guarantee “equality” comparison against prefix keys, and don’t support the prefix “range” scan.
  * We only support scanning on the keys which match with the prefix key.
  * E.g. We don’t support the range scan from user A to user B due to technical complexity. That’s the reason we can’t leverage the existing getRange API.

As we mentioned, we want to make an explicit change to the init() method of StateStoreProvider which would break backward compatibility, assuming that 3rd party state store providers need to update their code in any way to support prefix match scan. Given RocksDB state store provider is being donated to the OSS and plan to be available in Spark 3.2, the majority of the users would migrate to the built-in state store providers, which would remedy the concerns.

* Scanning key-value pairs matched to the prefix key

We propose to add a new method to the ReadStateStore (and StateStore by inheritance), as below:

```
def prefixScan(prefixKey: UnsafeRow): Iterator[UnsafeRowPair]
```

We require callers to pass the `prefixKey` which would have the same schema with the registered prefix key schema. In other words, the schema of the parameter `prefixKey` should match to the projection of the prefix key on the full key based on the number of columns for the prefix key.

The method contract is clear - the method will return the iterator which will give the key-value pairs whose prefix key is matched with the given prefix key. Callers should only rely on the contract and should not expect any other characteristics based on specific details on the state store provider.

In the caller’s point of view, the prefix key is only used for retrieving key-value pairs via prefix match scan. Callers should keep using the full key to do CRUD.

Note that this PR also proposes to make a breaking change, removal of getRange(), which is never be implemented properly and hence never be called properly.

### Why are the changes needed?

* Introducing prefix match scan feature

Currently, the API in state store is only based on key-value data structure. This lacks on advanced data structures like list-like one, which required us to implement the data structure on our own whenever we need it. We had one in stream-stream join, and we were about to have another one in native session window. The custom implementation of data structure based on the state store API tends to be complicated and has to deal with multiple state stores.

We decided to enhance the state store API a bit to remove the requirement for native session window to implement its own. From the operator of native session window, it will just need to do prefix scan on group key to retrieve all sessions belonging to the group key.

Thanks to adding the feature to the part of state store API, this would enable state store providers to optimize the implementation based on the characteristic. (e.g. We will implement this in RocksDB state store provider via leveraging the characteristic that RocksDB sorts the key by natural order of binary format.)

* Removal of getRange API

Before introducing this we sought the way to leverage getRange, but it's quite hard to implement efficiently, with respecting its method contract. Spark always calls the method with (None, None) parameter and all the state store providers (including built-in) implement it as just calling iterator(), which is not respecting the method contract. That said, we can replace all getRange() usages to iterator(), and remove the API to remove any confusions/concerns.

### Does this PR introduce _any_ user-facing change?

Yes for the end users & maintainers of 3rd party state store provider. They will need to upgrade their state store provider implementations to adopt this change.

### How was this patch tested?

Added UT, and also existing UTs to make sure it doesn't break anything.

Closes #33038 from HeartSaVioR/SPARK-35861.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 094300fa60)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-12 09:07:07 -07:00
Chao Sun 5b2f191228 [SPARK-36056][SQL] Combine readBatch and readIntegers in VectorizedRleValuesReader
### What changes were proposed in this pull request?

Combine `readBatch` and `readIntegers` in `VectorizedRleValuesReader` by having them share the same `readBatchInternal` method.

### Why are the changes needed?

`readBatch` and `readIntegers` share similar code path and this Jira aims to combine them into one method for easier maintenance.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests as this is just a refactoring.

Closes #33271 from sunchao/SPARK-35743-read-integers.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 5edbbd1711)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-12 22:30:37 +08:00
Gengliang Wang 4e9e2f32e8 [SPARK-36072][SQL] TO_TIMESTAMP: return different results based on the default timestamp type
### What changes were proposed in this pull request?

The SQL function TO_TIMESTAMP should return different results based on the default timestamp type:
* when "spark.sql.timestampType" is TIMESTAMP_NTZ, return TimestampNTZType literal
* when "spark.sql.timestampType" is TIMESTAMP_LTZ, return TimestampType literal

This PR also refactor the class GetTimestamp and GetTimestampNTZ to reduce duplicated code.

### Why are the changes needed?

As "spark.sql.timestampType" sets the default timestamp type, the to_timestamp function should behave consistently with it.

### Does this PR introduce _any_ user-facing change?

Yes, when the value of "spark.sql.timestampType" is TIMESTAMP_NTZ, the result type of `TO_TIMESTAMP` is of TIMESTAMP_NTZ type.

### How was this patch tested?

Unit test

Closes #33280 from gengliangwang/to_timestamp.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 32720dd3e1)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-12 10:12:41 +03:00
gengjiaan 5816482868 [SPARK-36044][SQL] Suport TimestampNTZ in functions unix_timestamp/to_unix_timestamp
### What changes were proposed in this pull request?
The functions `unix_timestamp`/`to_unix_timestamp` should be able to accept input of `TimestampNTZType`.

### Why are the changes needed?
The functions `unix_timestamp`/`to_unix_timestamp` should be able to accept input of `TimestampNTZType`.

### Does this PR introduce _any_ user-facing change?
'Yes'.

### How was this patch tested?
New tests.

Closes #33278 from beliefer/SPARK-36044.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 8738682f6a)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-12 09:55:55 +03:00
Gengliang Wang 09e5bbdfbe [SPARK-36083][SQL] make_timestamp: return different result based on the default timestamp type
### What changes were proposed in this pull request?

The SQL function MAKE_TIMESTAMP should return different results based on the default timestamp type:
* when "spark.sql.timestampType" is TIMESTAMP_NTZ, return TimestampNTZType literal
* when "spark.sql.timestampType" is TIMESTAMP_LTZ, return TimestampType literal

### Why are the changes needed?

As "spark.sql.timestampType" sets the default timestamp type, the make_timestamp function should behave consistently with it.

### Does this PR introduce _any_ user-facing change?

Yes, when the value of "spark.sql.timestampType" is TIMESTAMP_NTZ, the result type of `MAKE_TIMESTAMP` is of TIMESTAMP_NTZ type.

### How was this patch tested?

Unit test

Closes #33290 from gengliangwang/mkTS.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 17ddcc9e82)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-11 20:48:04 +03:00
ulysses-you 2e4929b142 [SPARK-36032][SQL] Use inputPlan instead of currentPhysicalPlan to initialize logical link
### What changes were proposed in this pull request?

Change `currentPhysicalPlan.logicalLink.get` to `inputPlan.logicalLink.get` for initial logical link.

### Why are the changes needed?

At `initialPlan` we may remove some Spark Plan with `queryStagePreparationRules`, if removed Spark Plan is top level node, then we will lose the linked logical node.

Since we support AQE side broadcast join config. It's more common that a join is SMJ at normal planner and changed to BHJ after AQE reOptimize. However, `RemoveRedundantSorts` is applied before reOptimize at `initialPlan`, then a local sort might be removed incorrectly if a join is SMJ at first but changed to BHJ during reOptimize.

### Does this PR introduce _any_ user-facing change?

yes, bug fix

### How was this patch tested?

add test

Closes #33244 from ulysses-you/SPARK-36032.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 484b50cadf)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-08 22:40:03 -07:00
Takuya UESHIN 55111cafd1 [SPARK-36062][PYTHON] Try to capture faulthanlder when a Python worker crashes
### What changes were proposed in this pull request?

Try to capture the error message from the `faulthandler` when the Python worker crashes.

### Why are the changes needed?

Currently, we just see an error message saying `"exited unexpectedly (crashed)"` when the UDFs causes the Python worker to crash by like segmentation fault.
We should take advantage of [`faulthandler`](https://docs.python.org/3/library/faulthandler.html) and try to capture the error message from the `faulthandler`.

### Does this PR introduce _any_ user-facing change?

Yes, when a Spark config `spark.python.worker.faulthandler.enabled` is `true`, the stack trace will be seen in the error message when the Python worker crashes.

```py
>>> def f():
...   import ctypes
...   ctypes.string_at(0)
...
>>> sc.parallelize([1]).map(lambda x: f()).count()
```

```
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed): Fatal Python error: Segmentation fault

Current thread 0x000000010965b5c0 (most recent call first):
  File "/.../ctypes/__init__.py", line 525 in string_at
  File "<stdin>", line 3 in f
  File "<stdin>", line 1 in <lambda>
...
```

### How was this patch tested?

Added some tests, and manually.

Closes #33273 from ueshin/issues/SPARK-36062/faulthandler.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 115b8a180f)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-09 11:31:00 +09:00
Gengliang Wang ae62c9d772 [SPARK-36054][SQL] Support group by TimestampNTZ type column
### What changes were proposed in this pull request?

Support group by TimestampNTZ type column

### Why are the changes needed?

It's a basic SQL operation.

### Does this PR introduce _any_ user-facing change?

No, the new timestmap type is not released yet.

### How was this patch tested?

Unit test

Closes #33268 from gengliangwang/agg.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 382b66e267)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-08 22:33:43 +03:00
Gengliang Wang 9103c1fe23 [SPARK-36055][SQL] Assign pretty SQL string to TimestampNTZ literals
### What changes were proposed in this pull request?

Currently the TimestampNTZ literals shows only long value instead of timestamp string in its SQL string and toString result.
Before changes (with default timestamp type as TIMESTAMP_NTZ)
```
– !query
select timestamp '2019-01-01\t'
– !query schema
struct<1546300800000000:timestamp_ntz>
```

After changes:
```
– !query
select timestamp '2019-01-01\t'
– !query schema
struct<TIMESTAMP_NTZ '2019-01-01 00:00:00':timestamp_ntz>
```
### Why are the changes needed?

Make the schema of TimestampNTZ literals readable.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes #33269 from gengliangwang/ntzLiteralString.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit ee945e99cc)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-08 21:43:04 +03:00
PengLei 5ec2ddff6a [SPARK-36012][SQL] Add null flag in SHOW CREATE TABLE
### What changes were proposed in this pull request?
When exec the command `SHOW CREATE TABLE`, we should not lost the info null flag if the table column that
is specified `NOT NULL`

### Why are the changes needed?
[SPARK-36012](https://issues.apache.org/jira/browse/SPARK-36012)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add UT test for V1 and existed UT for V2

Closes #33219 from Peng-Lei/SPARK-36012.

Authored-by: PengLei <peng.8lei@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit e071721a51)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-09 01:21:57 +08:00
Wenchen Fan b8d3da16b1 [SPARK-35874][SQL] AQE Shuffle should wait for its subqueries to finish before materializing
### What changes were proposed in this pull request?

Currently, AQE uses a very tricky way to trigger and wait for the subqueries:
1. submitting stage calls `QueryStageExec.materialize`
2. `QueryStageExec.materialize` calls `executeQuery`
3. `executeQuery` does some preparation works, which goes to `QueryStageExec.doPrepare`
4. `QueryStageExec.doPrepare` calls `prepare` of shuffle/broadcast, which triggers all the subqueries in this stage
5. `executeQuery` then calls `waitForSubqueries`, which does nothing because `QueryStageExec` itself has no subqueries
6. then we submit the shuffle/broadcast job, without waiting for subqueries
7. for `ShuffleExchangeExec.mapOutputStatisticsFuture`, it calls `child.execute`, which calls `executeQuery` and wait for subqueries in the query tree of `child`
8. The only missing case is: `ShuffleExchangeExec` itself may contain subqueries(repartition expression) and AQE doesn't wait for it.

A simple fix would be overwriting `waitForSubqueries` in `QueryStageExec`, and forward the request to shuffle/broadcast, but this PR proposes a different and probably cleaner way: we follow `execute`/`doExecute` in `SparkPlan`, and add similar APIs in the AQE version of "execute", which gets a future from shuffle/broadcast.

### Why are the changes needed?

bug fix

### Does this PR introduce _any_ user-facing change?

a query fails without the fix and can run now

### How was this patch tested?

new test

Closes #33058 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 2df67a1a1b)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-09 00:21:08 +08:00
Karen Feng f31cf163d9 [SPARK-35958][CORE] Refactor SparkError.scala to SparkThrowable.java
### What changes were proposed in this pull request?

Refactors the base Throwable trait `SparkError.scala` (introduced in SPARK-34920) an interface `SparkThrowable.java`.

### Why are the changes needed?

- Renaming `SparkError` to `SparkThrowable` better reflect sthat this is the base interface for both `Exception` and `Error`
- Migrating to Java maximizes its extensibility

### Does this PR introduce _any_ user-facing change?

Yes; the base trait has been renamed and the accessor methods have changed (eg. `sqlState` -> `getSqlState()`).

### How was this patch tested?

Unit tests.

Closes #33164 from karenfeng/SPARK-35958.

Authored-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 71c086eb87)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-08 23:55:11 +08:00
Yuanjian Li 097b667db7 [SPARK-35988][SS] The implementation for RocksDBStateStoreProvider
### What changes were proposed in this pull request?
Add the implementation for the RocksDBStateStoreProvider. It's the subclass of StateStoreProvider that leverages all the functionalities implemented in the RocksDB instance.

### Why are the changes needed?
The interface for the end-user to use the RocksDB state store.

### Does this PR introduce _any_ user-facing change?
Yes. New RocksDBStateStore can be used in their applications.

### How was this patch tested?
New UT added.

Closes #33187 from xuanyuanking/SPARK-35988.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 0621e78b5f)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-07-08 21:02:57 +09:00
Gengliang Wang cafb829c42 [SPARK-36043][SQL][TESTS] Add end-to-end tests with default timestamp type as TIMESTAMP_NTZ
### What changes were proposed in this pull request?

Run end-to-end tests with default timestamp type as TIMESTAMP_NTZ to increase test coverage.

### Why are the changes needed?

Inrease test coverage.
Also, there will be more and more expressions have different behaviors when the default timestamp type is TIMESTAMP_NTZ, for example, `to_timestamp`, `from_json`, `from_csv`, and so on. Having this new test suite helps future developments.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

CI tests.

Closes #33259 from gengliangwang/ntzTest.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 57342dfc1d)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-07-08 19:39:17 +08:00
Kousuke Saruta 429d1780b3 [SPARK-36022][SQL] Respect interval fields in extract
### What changes were proposed in this pull request?

This PR fixes an issue about `extract`.
`Extract` should process only existing fields of interval types. For example:

```
spark-sql> SELECT EXTRACT(MONTH FROM INTERVAL '2021-11' YEAR TO MONTH);
11
spark-sql> SELECT EXTRACT(MONTH FROM INTERVAL '2021' YEAR);
0
```
The last command should fail as the month field doesn't present in INTERVAL YEAR.

### Why are the changes needed?

Bug fix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

Closes #33247 from sarutak/fix-extract-interval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 39002cb995)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-08 09:41:09 +03:00
Cheng Su 12b29cd41a [SPARK-32577][SQL][TEST][FOLLOWUP] Fix the config value of shuffled hash join for all other test queries
### What changes were proposed in this pull request?

This is the followup from https://github.com/apache/spark/pull/33236#issuecomment-875242730, where we are fixing the config value of shuffled hash join, for all other test queries. Found all configs by searching in https://github.com/apache/spark/search?q=spark.sql.join.preferSortMergeJoin .

### Why are the changes needed?

Fix test to have better test coverage.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #33249 from c21/join-test.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 23943e5e40)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-08 13:17:47 +09:00
Angerszhuuuu 74bfbcd643 [SPARK-36021][SQL] Parse interval literals should support more than 2 digits
### What changes were proposed in this pull request?
For case
```
spark-sql> select interval '123456:12' minute to second;
Error in query:
requirement failed: Interval string must match day-time format of '^(?<sign>[+|-])?(?<minute>\d{1,2}):(?<second>(\d{1,2})(\.(\d{1,9}))?)$': 123456:12, set spark.sql.legacy.fromDayTimeString.enabled to true to restore the behavior before Spark 3.0.(line 1, pos 16)

== SQL ==
select interval '123456:12' minute to second
----------------^^^
```

we should support hour/minute/second when for more than 2 digits when parse interval literal string

### Why are the changes needed?
Keep consistence

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #33231 from AngersZhuuuu/SPARK-36021.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit ea3333a200)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-07 20:31:40 +03:00
gengjiaan 2fc57bba31 [SPARK-36015][SQL] Support TimestampNTZType in the Window spec definition
### What changes were proposed in this pull request?
The method `WindowSpecDefinition.isValidFrameType` doesn't consider `TimestampNTZType`. We should support it as for `TimestampType`.

### Why are the changes needed?
Support `TimestampNTZType` in the Window spec definition.

### Does this PR introduce _any_ user-facing change?
'Yes'. This PR allows users use  `TimestampNTZType` as the sort spec in window spec definition.

### How was this patch tested?
New tests.

Closes #33246 from beliefer/SPARK-36015.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 62ff2add94)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-07 20:27:16 +03:00
gengjiaan 25ea296c3c [SPARK-36017][SQL] Support TimestampNTZType in expression ApproximatePercentile
### What changes were proposed in this pull request?
The current `ApproximatePercentile` supports `TimestampType`, but not supports timestamp without time zone yet.
This PR will add the function.

### Why are the changes needed?
`ApproximatePercentile` need supports `TimestampNTZType`.

### Does this PR introduce _any_ user-facing change?
'Yes'. `ApproximatePercentile` accepts `TimestampNTZType`.

### How was this patch tested?
New tests.

Closes #33241 from beliefer/SPARK-36017.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit cc4463e818)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-07 12:41:23 +03:00
Cheng Su 08cdd1fbcc [SPARK-32577][SQL][TEST] Fix the config value for shuffled hash join in test in-joins.sql
### What changes were proposed in this pull request?

We found the `in-join.sql` does not test shuffled hash join properly in https://issues.apache.org/jira/browse/SPARK-32577, but didn't find a good way to fix it. Given we now have a test config to enforce shuffled hash join in https://github.com/apache/spark/pull/33182, we can fix the test here now as well.

### Why are the changes needed?

Fix test to have better test coverage.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Reran the test to compare the output, and verified the query plan manually to make sure shuffled hash join being used.

Closes #33236 from c21/join-test.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit f3c11595ce)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-07 18:16:45 +09:00
Linhong Liu f3ec79990f [SPARK-35984][SQL][TEST] Config to force applying shuffled hash join
### What changes were proposed in this pull request?
Add a config `spark.sql.join.forceApplyShuffledHashJoin` to force applying shuffled hash join
during the join selection.

### Why are the changes needed?
In the `SQLQueryTestSuite`, we want to cover 3 kinds of join (BHJ, SHJ, SMJ) in join.sql. But even
if the `spark.sql.join.preferSortMergeJoin` is set to `false`, shuffled hash join is still not guaranteed.
Thus, we need another config to force the selection.

### Does this PR introduce _any_ user-facing change?
No, only for testing

### How was this patch tested?
newly added tests
Verified all queries in join.sql will use `ShuffledHashJoin` when the config set to `true`

Closes #33182 from linhongliu-db/SPARK-35984-hash-join-config.

Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 7566db6033)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-07 00:58:30 +08:00
Wenchen Fan 6c1c1af6b4 [SPARK-36020][SQL] Check logical link in remove redundant projects
### What changes were proposed in this pull request?

The `RemoveRedundantProjects` feature can conflict with the AQE broadcast threshold ([PR](https://github.com/apache/spark/pull/32391)) sometimes. After removing the project, the physical plan to logical plan link can be changed and we may have a `Project` above `LogicalQueryStage`. This breaks AQE broadcast threshold, because the stats of `Project` does not have the `isRuntime = true` flag, and thus still use the normal broadcast threshold.

This PR updates `RemoveRedundantProjects` to not remove `ProjectExec` that has a different logical plan link than its child.

### Why are the changes needed?

Make AQE broadcast threshold work in more cases.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes #33222 from cloud-fan/aqe2.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 6b3ab8262f)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-06 21:17:51 +08:00
Kousuke Saruta 5f383f0102 [SPARK-35999][SQL] Make from_csv/to_csv to handle day-time intervals properly
### What changes were proposed in this pull request?

This PR fixes an issue that `from_csv/to_csv` doesn't handle day-time intervals properly.
`from_csv` throws exception if day-time interval types are given.
```
spark-sql> select from_csv("interval '1 2:3:4' day to second", "a interval day to second");
21/07/03 04:39:13 ERROR SparkSQLDriver: Failed in [select from_csv("interval '1 2:3:4' day to second", "a interval day to second")]
java.lang.Exception: Unsupported type: interval day to second
 at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedTypeError(QueryExecutionErrors.scala:775)
 at org.apache.spark.sql.catalyst.csv.UnivocityParser.makeConverter(UnivocityParser.scala:224)
 at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$valueConverters$1(UnivocityParser.scala:134)
```

Also, `to_csv` doesn't handle day-time interval types properly though any exception is thrown.
The result of `to_csv` for day-time interval types is not ANSI interval compliant form.

```
spark-sql> select to_csv(named_struct("a", interval '1 2:3:4' day to second));
93784000000
```
The result above should be `INTERVAL '1 02:03:04' DAY TO SECOND`.

### Why are the changes needed?

Bug fix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

Closes #33226 from sarutak/csv-dtinterval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit def8bc5c96)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-06 17:38:26 +08:00
Kousuke Saruta 634b2e265c [SPARK-35983][SQL] Allow from_json/to_json for map types where value types are day-time intervals
### What changes were proposed in this pull request?

This PR fixes two issues. One is that `to_json` doesn't support `map` types where value types are `day-time` interval types like:
```
spark-sql> select to_json(map('a', interval '1 2:3:4' day to second));
21/07/06 14:53:58 ERROR SparkSQLDriver: Failed in [select to_json(map('a', interval '1 2:3:4' day to second))]
java.lang.RuntimeException: Failed to convert value 93784000000 (class of class java.lang.Long) with the type of DayTimeIntervalType(0,3) to JSON.
```
The other issue is that even if the issue of `to_json` is resolved, `from_json` doesn't support to convert `day-time` interval string to JSON. So the result of following query will be `null`.
```
spark-sql> select from_json(to_json(map('a', interval '1 2:3:4' day to second)), 'a interval day to second');
{"a":null}
```

### Why are the changes needed?

There should be no reason why day-time intervals cannot used as map value types.
`CalendarIntervalTypes` can do it.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

Closes #33225 from sarutak/json-dtinterval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit c8ff613c3c)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-06 11:07:10 +03:00
RoryQi 176b055c12 [SPARK-36011][SQL] Disallow altering permanent views based on temporary views or UDFs
### What changes were proposed in this pull request?
PR #15764 disabled creating permanent views based on temporary views or UDFs.  But AlterViewCommand didn't block temporary objects.

### Why are the changes needed?
More robust view canonicalization.

### Does this PR introduce _any_ user-facing change?
Yes, now if you alter a permanent view based on temporary views or UDFs, the operation will fail.

### How was this patch tested?
Add new unit tests.

Closes #33204 from jerqi/alter_view.

Authored-by: RoryQi <1242949407@qq.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit e0c6b2e965)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-06 14:56:25 +08:00
Yuanjian Li 22b303a648 [SPARK-35788][SS] Metrics support for RocksDB instance
### What changes were proposed in this pull request?
Add more metrics for the RocksDB instance. We transform the native states from RocksDB.

### Why are the changes needed?
Improve the usability with more metrics for RocksDB instance.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes #32934 from xuanyuanking/SPARK-35788.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 9544277b0a)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-07-06 11:12:37 +09:00
Wenchen Fan 0df89d8999 [SPARK-34302][SQL][FOLLOWUP] More code cleanup
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/33113, to do some code cleanup:
1. `UnresolvedFieldPosition` doesn't need to include the field name. We can get it through "context" (`AlterTableAlterColumn.column.name`).
2. Run `ResolveAlterTableCommands` in the main resolution batch, so that the column/field resolution is also unified between v1 and v2 commands (same error message).
3. Fail immediately in `ResolveAlterTableCommands` if we can't resolve the field, instead of waiting until `CheckAnalysis`. We don't expect other rules to resolve fields in ALTER  TABLE commands, so failing immediately is simpler and we can remove duplicated code in `CheckAnalysis`.

### Why are the changes needed?

code simplification.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #33213 from cloud-fan/follow.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 8b46e26fc6)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-06 03:43:54 +08:00
Kousuke Saruta 544b7e16ac [SPARK-35998][SQL] Make from_csv/to_csv to handle year-month intervals properly
### What changes were proposed in this pull request?

This PR fixes an issue that `from_csv/to_csv` doesn't handle year-month intervals properly.
`from_csv` throws exception if year-month interval types are given.
```
spark-sql> select from_csv("interval '1-2' year to month", "a interval year to month");
21/07/03 04:32:24 ERROR SparkSQLDriver: Failed in [select from_csv("interval '1-2' year to month", "a interval year to month")]
java.lang.Exception: Unsupported type: interval year to month
	at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedTypeError(QueryExecutionErrors.scala:775)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.makeConverter(UnivocityParser.scala:224)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$valueConverters$1(UnivocityParser.scala:134)
```

Also, `to_csv` doesn't handle year-month interval types properly though any exception is thrown.
The result of `to_csv` for year-month interval types is not ANSI interval compliant form.

```
spark-sql> select to_csv(named_struct("a", interval '1-2' year to month));
14
```
The result above should be `INTERVAL '1-2' YEAR TO MONTH`.

### Why are the changes needed?

Bug fix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

Closes #33210 from sarutak/csv-yminterval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit f4237aff7e)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-05 13:11:03 +03:00
ulysses-you ed7c81dfaa [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE
### What changes were proposed in this pull request?

Skip remove shuffle if it's shuffle origin is not `REPARTITION_BY_COL` in AQE.

### Why are the changes needed?

`REPARTITION_BY_COL` doesn't guarantee the output partitioning number so we can remove it safely in AQE.

For `REPARTITION_BY_NUM`, we should retain the shuffle which partition number is specified by user.
For `REBALANCE_PARTITIONS_BY_COL`, it is a special shuffle used to rebalance partitions so we should not remove it.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

add test

Closes #33188 from ulysses-you/SPARK-35989.

Lead-authored-by: ulysses-you <ulyssesyou18@gmail.com>
Co-authored-by: ulysses <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 7fe4c4a9ad)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-05 17:10:57 +08:00
Cheng Su 39b3a04bfe [SPARK-35794][SQL] Allow custom plugin for AQE cost evaluator
### What changes were proposed in this pull request?

Current AQE has cost evaluator to decide whether to use new plan after replanning. The current used evaluator is `SimpleCostEvaluator` to make decision based on number of shuffle in the query plan. This is not perfect cost evaluator, and different production environments might want to use different custom evaluators. E.g., sometimes we might want to still do skew join even though it might introduce extra shuffle (trade off resource for better latency), sometimes we might want to take sort into consideration for cost as well. Take our own setting as an example, we are using a custom remote shuffle service (Cosco), and the cost model is more complicated. So We want to make the cost evaluator to be pluggable, and developers can implement their own `CostEvaluator` subclass and plug in dynamically based on configuration.

The approach is to introduce a new config to allow define sub-class name of `CostEvaluator` - `spark.sql.adaptive.customCostEvaluatorClass`. And add `CostEvaluator.instantiate` to instantiate the cost evaluator class in `AdaptiveSparkPlanExec.costEvaluator`.

### Why are the changes needed?

Make AQE cost evaluation more flexible.

### Does this PR introduce _any_ user-facing change?

No but an internal config is introduced - `spark.sql.adaptive.customCostEvaluatorClass` to allow custom implementation of `CostEvaluator`.

### How was this patch tested?

Added unit test in `AdaptiveQueryExecSuite.scala`.

Closes #32944 from c21/aqe-cost.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 044dddf288)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-05 09:06:49 +00:00
Kousuke Saruta 26bcf02833 [SPARK-35982][SQL] Allow from_json/to_json for map types where value types are year-month intervals
### What changes were proposed in this pull request?

This PR fixes two issues. One is that `to_json` doesn't support `map` types where value types are `year-month` interval types like:
```
spark-sql> select to_json(map('a', interval '1-2' year to  month));
21/07/02 11:38:15 ERROR SparkSQLDriver: Failed in [select to_json(map('a', interval '1-2' year to  month))]
java.lang.RuntimeException: Failed to convert value 14 (class of class java.lang.Integer) with the type of YearMonthIntervalType(0,1) to JSON.
```
The other issue is that even if the issue of `to_json` is resolved, `from_json` doesn't support to convert `year-month` interval string to JSON. So the result of following query will be `null`.
```
spark-sql> select from_json(to_json(map('a', interval '1-2' year to month)), 'a interval year to month');
{"a":null}
```

### Why are the changes needed?

There should be no reason why year-month intervals cannot used as map value types.
`CalendarIntervalTypes` can do it.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

Closes #33181 from sarutak/map-json-yminterval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 6474226852)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-05 10:36:08 +03:00
Gengliang Wang ac1c6aa45c [SPARK-35987][SQL] The ANSI flags of Sum and Avg should be kept after being copied
### What changes were proposed in this pull request?

Make the ANSI flag part of expressions `Sum` and `Average`'s parameter list, instead of fetching it from the sessional SQLConf.

### Why are the changes needed?

For Views, it is important to show consistent results even the ANSI configuration is different in the running session. This is why many expressions like 'Add'/'Divide' making the ANSI flag part of its case class parameter list.

We should make it consistent for the expressions `Sum` and `Average`

### Does this PR introduce _any_ user-facing change?

Yes, the `Sum` and `Average` inside a View always behaves the same, independent of the ANSI model SQL configuration in the current session.

### How was this patch tested?

Existing UT

Closes #33186 from gengliangwang/sumAndAvg.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 51103cdcdd)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-05 12:34:39 +08:00
Wenchen Fan ec84982191 [SPARK-35940][SQL] Refactor EquivalentExpressions to make it more efficient
### What changes were proposed in this pull request?

This PR uses 2 ideas to make `EquivalentExpressions` more efficient:
1. do not keep all the equivalent expressions, we only need a count
2. track the "height" of common subexpressions, to quickly do child-parent sort, and filter out non-child expressions in `addCommonExprs`

This PR also fixes several small bugs (exposed by the refactoring), please see PR comments.

### Why are the changes needed?

code cleanup and small perf improvement

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #33142 from cloud-fan/codegen.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit e6ce220690)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-03 08:28:59 -07:00
Liang-Chi Hsieh 0db6f3a844 [SPARK-35785][SS][FOLLOWUP] Ignore concurrent update and cleanup test
### What changes were proposed in this pull request?

This patch ignores the test "ensure that concurrent update and cleanup consistent versions" in #32933. The test is currently flaky and we will address it later.

### Why are the changes needed?

Unblock other developments.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #33195 from viirya/ignore-rocksdb-test.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit a6e00ee9d7)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-07-02 10:58:59 -07:00
Dongjoon Hyun d7990943c3 [SPARK-35992][BUILD] Upgrade ORC to 1.6.9
### What changes were proposed in this pull request?

This PR aims to upgrade Apache ORC to 1.6.9.

### Why are the changes needed?

This is required to bring ORC-804 in order to fix ORC encryption masking bug.

### Does this PR introduce _any_ user-facing change?

No. This is not released yet.

### How was this patch tested?

Pass the newly added test case.

Closes #33189 from dongjoon-hyun/SPARK-35992.

Lead-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit c55b9fd1e0)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-07-02 09:50:00 -07:00
Yuanjian Li ae8f91734d [SPARK-35785][SS][3.2] Cleanup support for RocksDB instance
### What changes were proposed in this pull request?
Add the functionality of cleaning up files of old versions for the RocksDB instance and RocksDBFileManager.

### Why are the changes needed?
Part of the implementation of RocksDB state store.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
New UT added.

Closes #33184 from xuanyuanking/branch-3.2.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-07-02 17:12:00 +09:00
Wenchen Fan c1d8178817 [SPARK-35968][SQL] Make sure partitions are not too small in AQE partition coalescing
### What changes were proposed in this pull request?

By default, AQE will set `COALESCE_PARTITIONS_MIN_PARTITION_NUM` to the spark default parallelism, which is usually quite big. This is to keep the parallelism on par with non-AQE, to avoid perf regressions.

However, this usually leads to many small/empty partitions, and hurts performance (although not worse than non-AQE). Users usually blindly set `COALESCE_PARTITIONS_MIN_PARTITION_NUM` to 1, which makes this config quite useless.

This PR adds a new config to set the min partition size, to avoid too small partitions after coalescing. By default, Spark will not respect the target size, and only respect this min partition size, to maximize the parallelism and avoid perf regression in AQE. This PR also adds a bool config to respect the target size when coalescing partitions, and it's recommended to set it to get better overall performance. This PR also deprecates the `COALESCE_PARTITIONS_MIN_PARTITION_NUM` config.

### Why are the changes needed?

AQE is default on now, we should make the perf better in the default case.

### Does this PR introduce _any_ user-facing change?

yes, a new config.

### How was this patch tested?

new tests

Closes #33172 from cloud-fan/aqe2.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 0c9c8ff569)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-02 16:07:46 +08:00