Commit graph

11450 commits

Author SHA1 Message Date
PengLei 61bd036cb9 [SPARK-35852][SQL] Use DateAdd instead of TimeAdd for DateType +/- INTERVAL DAY
### What changes were proposed in this pull request?
We use `DateAdd` to impl `DateType` `+`/`-`  `INTERVAL DAY`

### Why are the changes needed?
To improve the impl of `DateType` `+`/`-`  `INTERVAL DAY`
### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add ut test

Closes #33033 from Peng-Lei/SPARK-35852.

Authored-by: PengLei <18066542445@189.cn>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-24 08:47:29 +03:00
tanel.kiis@gmail.com b3a2cebc2b [SPARK-34807][SQL] Transpose Window nodes with Project between them
### What changes were proposed in this pull request?

Extend the `TransposeWindow` rule to transpose `Window` nodes, that have `Project` between them.

### Why are the changes needed?

The analyzer will turn a `dataset.withColumn("colName", expressionWithWindowFunction)` method call to a `Project - Window - Project` chain in the logical plan. When this method is called multiple times in a row, then the projects can block the `Window` nodes from being transposed by the current `TransposeWindow` rule.

TPCDS q47 and q57 are also improved by this.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

Closes #31980 from tanelk/SPARK-34807_transpose_window.

Lead-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Co-authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-06-24 10:28:57 +08:00
Angerszhuuuu ad187227f1 [SPARK-35731][SQL][TESTS] Check all day-time interval types in arrow
### What changes were proposed in this pull request?
Check all day-time interval types in arrow.

### Why are the changes needed?
To improve test coverage.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added UT.

Closes #33039 from AngersZhuuuu/SPARK-35731.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-23 23:38:41 +03:00
Kousuke Saruta 2d3fa04e90 [SPARK-35729][SQL][TESTS] Check all day-time interval types in aggregate expressions
### What changes were proposed in this pull request?

This PR adds test to check `sum` and `avg` works with all the `DayTimeIntervalType`.
This PR also moves a dataframe commonly used by tests `SPARK-34837: Support ANSI SQL intervals by the aggregate function avg` and `SPARK-34716: Support ANSI SQL intervals by the aggregate function sum` to `SQLTestData.scala`, and a little bit modifies it.

### Why are the changes needed?

To ensure the results of aggregations are what is expected.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #33042 from sarutak/check-interval-agg-dt.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-23 23:34:28 +03:00
Jungtaek Lim (HeartSaVioR) 476197791b [SPARK-34889][SS] Introduce MergingSessionsIterator merging elements directly which belong to the same session
Introduction: this PR is a part of SPARK-10816 (`EventTime based sessionization (session window)`). Please refer #31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.)

### What changes were proposed in this pull request?

This PR introduces MergingSessionsIterator, which enables to merge elements belong to the same session directly.

MergingSessionsIterator is a variant of SortAggregateIterator which merges the session windows based on the fact input rows are sorted by "group keys + the start time of session window". When merging windows, MergingSessionsIterator also applies aggregations on merged window, which eliminates the necessity on buffering inputs (which requires copying rows) and update the session spec for each input.

MergingSessionsIterator is quite performant compared to UpdatingSessionsIterator brought by SPARK-34888. Note that MergingSessionsIterator can only apply to the cases aggregation can be applied altogether, so there're still rooms for UpdatingSessionIterator to be used.

This issue also introduces MergingSessionsExec which is the physical node on leveraging MergingSessionsIterator to sort the input rows and aggregate rows according to the session windows.

### Why are the changes needed?

This part is a one of required on implementing SPARK-10816.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test suite added.

Closes #31987 from HeartSaVioR/SPARK-34889-SPARK-10816-PR-31570-part-2.

Lead-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Co-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-06-23 13:04:37 -07:00
Angerszhuuuu 077cf2acdb [SPARK-35733][SQL][TESTS] Check all day-time interval types in HiveInspectors tests
### What changes were proposed in this pull request?
Check all day-time interval types in HiveInspectors tests.

### Why are the changes needed?
New tests should improve test coverage for day-time interval types.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added UT.

Closes #33036 from AngersZhuuuu/SPARK-35733.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-23 19:20:51 +03:00
Chao Sun b8acbf6d88 [SPARK-35846][SQL] Introduce ParquetReadState to track various states while reading a Parquet column chunk
### What changes were proposed in this pull request?

Move all the bookkeeping states while scanning a Parquet column chunk into a single class `ParquetReadState`.

### Why are the changes needed?

As suggested [here](https://github.com/apache/spark/pull/32753#discussion_r655580942). To support column index in the vectorized reader path, we'll going to introduce more states to track. These are spread across different classes which make the code harder to maintain. Therefore, this proposes to move them into a single class so they can be managed better.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs.

Closes #33006 from sunchao/SPARK-35846.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-23 02:56:00 -07:00
Gengliang Wang 6f51e37eb5 [SPARK-35857][SQL] The ANSI flag of Cast should be kept after being copied
### What changes were proposed in this pull request?

Make the ANSI flag part of expression `Cast`'s  parameter list, instead of fetching it from the sessional SQLConf.

### Why are the changes needed?

For Views, it is important to show consistent results even the ANSI configuration is different in the running session. This is why many expressions like 'Add'/'Divide' making the ANSI flag part of its case class parameter list.

We should make it consistent for the expression `Cast`

### Does this PR introduce _any_ user-facing change?

Yes, the `Cast` inside a View always behaves the same, independent of the ANSI model SQL configuration in the current session.

### How was this patch tested?

Existing UT

Closes #33027 from gengliangwang/ansiFlagInCast.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-23 16:52:33 +08:00
Angerszhuuuu 758b423a31 [SPARK-35860][SQL] Support UpCast between different field of YearMonthIntervalType/DayTimeIntervalType
### What changes were proposed in this pull request?
Support UpCast between different field of YearMonthIntervalType/DayTimeIntervalType

### Why are the changes needed?
Since in our encoder we handle Period/Duration as default  YearMonthIntervalType/DayTimeIntervalType, when we use udf to handle this type, it will upcast all type of YearMonthIntervalType/DayTimeIntervalType to default YearMonthIntervalType/DayTimeIntervalType, so we need to support this.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added Ut

Closes #33035 from AngersZhuuuu/SPARK-35860.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-23 11:32:13 +03:00
Angerszhuuuu 7c1a9dd3f5 [SPARK-35776][SQL][TESTS] Check all year-month interval types in arrow
### What changes were proposed in this pull request?
Add tests to check that all year-month interval types are supported in (de-)serialization from/to Arrow format.

### Why are the changes needed?
New tests should improve test coverage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
added ut

Closes #32993 from AngersZhuuuu/SPARK-35776.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-23 10:59:50 +03:00
Peter Toth 79e3d0d98f [SPARK-35855][SQL] Unify reuse map data structures in non-AQE and AQE rules
### What changes were proposed in this pull request?
This PR unifies reuse map data structures in non-AQE and AQE rules to a simple `Map[<canonicalized plan>, <plan>]` based on the discussion here: https://github.com/apache/spark/pull/28885#discussion_r655073897

### Why are the changes needed?
The proposed `Map[<canonicalized plan>, <plan>]` is simpler than the currently used `Map[<schema>, ArrayBuffer[<plan>]]` in `ReuseMap`/`ReuseExchangeAndSubquery` (non-AQE) and consistent with the `ReuseAdaptiveSubquery` (AQE) subquery reuse rule.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing UTs.

Closes #33021 from peter-toth/SPARK-35855-unify-reuse-map-data-structures.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-23 07:20:47 +00:00
Wenchen Fan 20edfdd39a [SPARK-35845][SQL] OuterReference resolution should reject ambiguous column names
### What changes were proposed in this pull request?

The current OuterReference resolution is a bit weird: when the outer plan has more than one child, it resolves OuterReference from the output of each child, one by one, left to right.

This is incorrect in the case of join, as the column name can be ambiguous if both left and right sides output this column.

This PR fixes this bug by resolving OuterReference with `outerPlan.resolveChildren`, instead of something like `outerPlan.children.foreach(_.resolve(...))`

### Why are the changes needed?

bug fix

### Does this PR introduce _any_ user-facing change?

The problem only occurs in join, and join condition doesn't support correlated subquery yet. So this PR only improves the error message. Before this PR, people see
```
java.lang.UnsupportedOperationException
Cannot generate code for expression: outer(t1a#291)
```

### How was this patch tested?

a new test

Closes #33004 from cloud-fan/outer-ref.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-23 14:32:34 +08:00
Angerszhuuuu df55945804 [SPARK-35772][SQL][TESTS] Check all year-month interval types in HiveInspectors tests
### What changes were proposed in this pull request?
Check all year-month interval types in HiveInspectors tests.

### Why are the changes needed?
To improve test coverage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT.

Closes #32970 from AngersZhuuuu/SPARK-35772.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-23 08:54:07 +03:00
Kousuke Saruta 4416b4b8ba [SPARK-35734][SQL][FOLLOWUP] IntervalUtils.toDayTimeIntervalString should consider the case a day-time type is casted as another day-time type
### What changes were proposed in this pull request?

This PR fixes an issue that `IntervalUtils.toDayTimeIntervalString` doesn't consider the case that a day-time interval type is casted as another day-time interval type.
if data of `interval day to second` is casted as `interval hour to second`, the value of the day is multiplied by 24 and added to the value of hour. For example, `INTERVAL '1 2' DAY TO HOUR` will be `INTERVAL '26' HOUR` if it's casted.
If this behavior is intended, it should be stringified as `INTERVAL '26' HOUR` but currently, it will be `INTERVAL '2' HOUR`

### Why are the changes needed?

t's a bug if the behavior of cast is intended.

### Does this PR introduce _any_ user-facing change?

No, because this feature is not released yet.

### How was this patch tested?

Modified the tests added in SPARK-35734 (#32891)

Closes #33031 from sarutak/fix-toDayTimeIntervalString.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-23 08:00:35 +03:00
Gengliang Wang 960a7e5fce [SPARK-35856][SQL][TESTS] Move new interval type test cases from CastSuite to CastBaseSuite
### What changes were proposed in this pull request?

There are a few test cases that are supposed to be in CastSuiteBase instead of CastSuite:

- SPARK-35112: Cast string to day-time interval
- SPARK-35111: Cast string to year-month interval
- SPARK-35820: Support cast DayTimeIntervalType in different fields
- SPARK-35819: Support cast YearMonthIntervalType in different fields

This PR is to move them to CastSuiteBase. Also, it adds comments for the scope of CastSuiteBase/CastSuite/AnsiCastSuiteBase.
### Why are the changes needed?

Increase test coverage so that we can test the casting under ANSI mode.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing UT

Closes #33022 from gengliangwang/moveTest.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-23 11:20:50 +08:00
Wenchen Fan a87ee5d8b9 [SPARK-35695][SQL][FOLLOWUP] Use AQE helper to simplify the code in CollectMetricsExec
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/32862 , to simplify the code with AQE helper.

### Why are the changes needed?

code cleanup

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #33026 from cloud-fan/follow.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-23 09:54:12 +09:00
Wenchen Fan 7a21e9c48f [SPARK-35858][SQL] SparkPlan.makeCopy should not set the active session
### What changes were proposed in this pull request?

We introduced `SparkSession.withActive` a while ago, and we use it when we need to run some code with a certain SparkSession as the active session.

Somehow we missed `SparkPlan.makeCopy`, which sets active session directly. This PR proposes to call `SparkSession.withActive` there.

### Why are the changes needed?

make sure we don't change the active session unexpectedly.

### Does this PR introduce _any_ user-facing change?

No. `makeCopy` is an internal function and I can't find a real case that this can change the active session. Mostly in an upper level, there is already a `SparkSession.withActive`, like `QueryExecution.executePhase`

### How was this patch tested?

existing tests

Closes #33029 from cloud-fan/minor1.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-23 09:50:59 +09:00
Wenchen Fan a2c1a55b1f [SPARK-35700][SQL][FOLLOWUP] Read schema from ORC files should strip CHAR/VARCHAR types
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/33001 , to provide a more direct fix.

The regression in 3.1 was caused by the fact that we changed the parser and allow the parser to return CHAR/VARCHAR type. We should have replaced CHAR/VARCHAR with STRING before the data type flows into the query engine, however, `OrcUtils` is missed.

When reading ORC files, at the task side we will read the real schema from ORC file metadata, then apply filter pushdown. For some reason, the implementation turns ORC schema to Spark schema before filter pushdown, and this step does not strip CHAR/VARCHAR. Note, for Parquet we use the Parquet schema directly in filter pushdown, and do not this have problem.

This PR proposes to replace the CHAR/VARCHAR with STRING when turning ORC schema to Spark schema.

### Why are the changes needed?

a more directly bug fix

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #33030 from cloud-fan/help.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-22 13:50:49 -07:00
Li Zhang dfd7b026dc [SPARK-35800][SS] Improving GroupState testability by introducing TestGroupState
### What changes were proposed in this pull request?
Proposed changes in this pull request:

1. Introducing the `TestGroupState` interface which is inherited from `GroupState` so that testing related getters can be exposed in a controlled manner
2. Changing `GroupStateImpl` to inherit from `TestGroupState` interface, instead of directly from `GroupState`
3. Implementing `TestGroupState` object with `create()` method to forward inputs to the private `GroupStateImpl` constructor
4. User input validations have been added into `GroupStateImpl`'s `createForStreaming()` method to prevent users from creating invalid GroupState objects.
5. Replacing existing `GroupStateImpl` usages in sql pkg internal unit tests with the newly added `TestGroupState` to give user best practice about `TestGroupState` usage.

With the changes in this PR, the class hierarchy is changed from `GroupStateImpl` -> `GroupState` to `GroupStateImpl` -> `TestGroupState` -> `GroupState` (-> means inherits from)

### Why are the changes needed?
The internal `GroupStateImpl` implementation for the `GroupState` interface has no public constructors accessible outside of the sql pkg. However, the user-provided state transition function for `[map|flatMap]GroupsWithState` requires a `GroupState` object as the prevState input.

Currently, users are calling the Structured Streaming engine in their unit tests in order to instantiate such `GroupState` instances, which makes UTs cumbersome.

The proposed `TestGroupState` interface is to give users controlled access to the `GroupStateImpl` internal implementation to largely improve testability of Structured Streaming state transition functions.

**Usage Example**
```
import org.apache.spark.sql.streaming.TestGroupState

test(“Structured Streaming state update function”) {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}
```

### Does this PR introduce _any_ user-facing change?
Yes, the `TestGroupState` interface and its corresponding `create()` factory function in its companion object are introduced in this pull request for users to use in unit tests.

### How was this patch tested?
- New unit tests are added
- Existing GroupState unit tests are updated

Closes #32938 from lizhangdatabricks/improve-group-state-testability.

Authored-by: Li Zhang <li.zhang@databricks.com>
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
2021-06-22 15:04:01 -04:00
Gengliang Wang ce53b7199d [SPARK-35854][SQL] Improve the error message of to_timestamp_ntz with invalid format pattern
### What changes were proposed in this pull request?

When SQL function `to_timestamp_ntz` has invalid format pattern input, throw a runtime exception with hints for the valid patterns, instead of throwing an upgrade exception with suggestions to use legacy formatters.

### Why are the changes needed?

As discussed in https://github.com/apache/spark/pull/32995/files#r655148980, there is an error message saying
"You may get a different result due to the upgrading of Spark 3.0: Fail to recognize 'yyyy-MM-dd GGGGG' pattern in the DateTimeFormatter. 1) You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0"

This is not true for function to_timestamp_ntz, which only uses the Iso8601TimestampFormatter and added since Spark 3.2. We should improve it.

### Does this PR introduce _any_ user-facing change?

No, the new SQL function is not released yet.

### How was this patch tested?

Unit test

Closes #33019 from gengliangwang/improveError.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-22 23:45:54 +08:00
Lei Peng bc61b62a55 [SPARK-35727][SQL] Return INTERVAL DAY from dates subtraction
What changes were proposed in this pull request?

1. Change the return value type from DayTimeIntervalType(DAY, SECOND) to DayTimeIntervalType(DAY, DAY) of SubtractDates.

Why are the changes needed?
https://issues.apache.org/jira/browse/SPARK-35727

Does this PR introduce any user-facing change?
no

How was this patch tested?
existed ut test

Closes #32999 from Peng-Lei/SPARK-35727.

Lead-authored-by: Lei Peng <peng.8lei@gmail.com>
Co-authored-by: PengLei <18066542445@189.cn>
Co-authored-by: Peng-Lei <peng.8lei@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-22 13:43:25 +00:00
YangJie 6c05459600 [SPARK-35838][BUILD][TESTS] Ensure all modules can be maven test independently in Scala 2.13
### What changes were proposed in this pull request?
Similar to SPARK-35532, the main change of this pr is add `scala-2.13` profile to external/kafka-0-10-sql/pom.xml, external/avro/pom.xml and sql/hive-thriftserver/pom.xml,  the `scala-2.13` profile include dependency on `scala-parallel-collections_2.13`, then all(34) spark modules can maven test independently.

### Why are the changes needed?
Ensure alll(34) spark modules can be maven test independently in Scala 2.13

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Pass the GitHub Action Scala 2.13 job
- Manual test:

1. Execute
```
dev/change-scala-version.sh 2.13

mvn clean install -DskipTests -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13
```

2. maven test `external/kafka-0-10-sql` module
```
mvn test -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13 -pl external/kafka-0-10-sql
```

**before**

```
Discovery starting.
Discovery completed in 857 milliseconds.
Run starting. Expected test count is: 464
...
KafkaRelationSuiteV2:
- explicit earliest to latest offsets
- default starting and ending offsets
- explicit offsets
- default starting and ending offsets with headers
- timestamp provided for starting and ending
- timestamp provided for starting, offset provided for ending
- timestamp provided for ending, offset provided for starting
- timestamp provided for starting, ending not provided
- timestamp provided for ending, starting not provided
- global timestamp provided for starting and ending
- no matched offset for timestamp - startingOffsets
- preferences on offset related options
- no matched offset for timestamp - endingOffsets
*** RUN ABORTED ***
  java.lang.NoClassDefFoundError: scala/collection/parallel/TaskSupport
  at org.apache.spark.SparkContext.$anonfun$union$1(SparkContext.scala:1411)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.SparkContext.withScope(SparkContext.scala:788)
  at org.apache.spark.SparkContext.union(SparkContext.scala:1405)
  at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:697)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:182)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:220)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:217)
  ...
  Cause: java.lang.ClassNotFoundException: scala.collection.parallel.TaskSupport
  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
  at org.apache.spark.SparkContext.$anonfun$union$1(SparkContext.scala:1411)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.SparkContext.withScope(SparkContext.scala:788)
  at org.apache.spark.SparkContext.union(SparkContext.scala:1405)
  at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:697)
  ...
```

**After**

```
Run completed in 33 minutes, 51 seconds.
Total number of tests run: 464
Suites: completed 31, aborted 0
Tests: succeeded 464, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

3. maven test `external/avro` module

```
mvn test -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13 -pl external/avro
```

**before**

```
Discovery starting.
Discovery completed in 2 seconds, 765 milliseconds.
Run starting. Expected test count is: 255
AvroReadSchemaSuite:
- append column at the end
- hide column at the end
- append column into middle
- hide column in the middle
- add a nested column at the end of the leaf struct column
- add a nested column in the middle of the leaf struct column
- add a nested column at the end of the middle struct column
- add a nested column in the middle of the middle struct column
- hide a nested column at the end of the leaf struct column
- hide a nested column in the middle of the leaf struct column
- hide a nested column at the end of the middle struct column
- hide a nested column in the middle of the middle struct column
*** RUN ABORTED ***
  java.lang.NoClassDefFoundError: scala/collection/parallel/TaskSupport
  at org.apache.spark.SparkContext.$anonfun$union$1(SparkContext.scala:1411)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.SparkContext.withScope(SparkContext.scala:788)
  at org.apache.spark.SparkContext.union(SparkContext.scala:1405)
  at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:697)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:182)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:220)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:217)
  ...
  Cause: java.lang.ClassNotFoundException: scala.collection.parallel.TaskSupport
  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
  at org.apache.spark.SparkContext.$anonfun$union$1(SparkContext.scala:1411)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.SparkContext.withScope(SparkContext.scala:788)
  at org.apache.spark.SparkContext.union(SparkContext.scala:1405)
  at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:697)
  ...
```

**After**

```
Run completed in 1 minute, 42 seconds.
Total number of tests run: 255
Suites: completed 12, aborted 0
Tests: succeeded 255, failed 0, canceled 0, ignored 2, pending 0
All tests passed.
```

4.  maven test `sql/hive-thriftserver` module

```
mvn test -Phadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13 -pl sql/hive-thriftserver
```

**before**

```
- union.sql *** FAILED ***
  "1  a
  1 a
  2 b
  2 b" did not contain "Exception" Exception did not match for query #2
  SELECT *
  FROM   (SELECT * FROM t1
          UNION ALL
          SELECT * FROM t1), expected: 1  a
  1 a
  2 b
  2 b, but got: java.sql.SQLException
  org.apache.hive.service.cli.HiveSQLException: Error running query: java.lang.NoClassDefFoundError: scala/collection/parallel/TaskSupport
    at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:38)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:324)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:229)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
    at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79)
    at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:43)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:229)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:224)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:238)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
  Caused by: java.lang.NoClassDefFoundError: scala/collection/parallel/TaskSupport
    at org.apache.spark.SparkContext.$anonfun$union$1(SparkContext.scala:1411)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:788)
    at org.apache.spark.SparkContext.union(SparkContext.scala:1405)
    at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:697)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:182)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:220)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:217)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:178)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:323)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:389)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3719)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2987)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3710)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:774)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3708)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2987)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:299)
    ... 16 more
  Caused by: java.lang.ClassNotFoundException: scala.collection.parallel.TaskSupport
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 40 more (ThriftServerQueryTestSuite.scala:209)
```

**After**

```
Run completed in 29 minutes, 17 seconds.
Total number of tests run: 535
Suites: completed 20, aborted 0
Tests: succeeded 535, failed 0, canceled 0, ignored 17, pending 0
All tests passed.
```

Closes #32994 from LuciferYang/SPARK-35838.

Authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-22 06:31:24 -07:00
Angerszhuuuu 5a510cf578 [SPARK-35726][SPARK-35769][SQL][FOLLOWUP] Call periodToMonths and durationToMicros in HiveResult should add endField
### What changes were proposed in this pull request?
When we call periodToMonths and durationToMicros  with certain type field, we should pass endField parameter.

### Why are the changes needed?
When we call periodToMonths and durationToMicros  with certain type field, we should pass endField parameter.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

Closes #32984 from AngersZhuuuu/SPARK-35726-35769.

Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-22 11:15:35 +03:00
gengjiaan 43cd6ca687 [SPARK-35378][SQL][FOLLOWUP] isLocal should consider CommandResult
### What changes were proposed in this pull request?
#32513 added the case class `CommandResult` so as we can eagerly execute command locally. But we forgot to update
`isLocal` of `Dataset`.

### Why are the changes needed?
`Dataset.isLocal` should consider `CommandResult`.

### Does this PR introduce _any_ user-facing change?
Yes. If the SQL plan is `CommandResult`, `Dataset.isLocal` must return true.

### How was this patch tested?
No test.

Closes #32963 from beliefer/SPARK-35378-followup2.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-22 07:39:54 +00:00
Venki Korukanti d4d11cfbfb [SPARK-35799][SS] Fix the allUpdatesTimeMs metric measuring in FlatMapGroupsWithStateExec
### What changes were proposed in this pull request?

Fix how we measure the metric `allUpdatesTimeMs` in `FlatMapGroupsWithStateExec` similar to other streaming stateful operators.

### Why are the changes needed?

Metric `allUpdatesTimeMs` meant to capture the start to end walltime of the operator `FlatMapGroupsWithStateExec`, but currently it just [captures](79362c4efc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala (L121)) the iterator creation time.

Fix it to measure similar to how other stateful operators measure. Example one [here](79362c4efc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala (L406)). This measurement is not perfect due to the nature of the lazy iterator and also includes the time the consumer operator spent in processing the current operator output, but it should give a good signal when comparing the metric in one microbatch to the metric in another microbatch.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing UTs for regression. Due to the nature of metric type (time), it is hard to write a UT, but have manually verified.

Closes #32952 from vkorukanti/SPARK-35799.

Authored-by: Venki Korukanti <venki.korukanti@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-06-22 13:57:21 +09:00
Kent Yao 9f734978d9 [SPARK-35700][SQL] Read char/varchar orc table with created and written by external systems
### What changes were proposed in this pull request?

The char/varchar type should be mapped to orc's string type too, see https://orc.apache.org/docs/types.html

### Why are the changes needed?

fix a regression

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes #33001 from yaooqinn/SPARK-35700.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-21 19:20:55 -07:00
Gengliang Wang 2bdd9fe5e3 [SPARK-35839][SQL] New SQL function: to_timestamp_ntz
### What changes were proposed in this pull request?

Implement new SQL function: `to_timestamp_ntz`.
The syntax is similar to the built-in function `to_timestamp`:
```
to_timestamp_ntz ( <date_expr> )

to_timestamp_ntz ( <timestamp_expr> )

to_timestamp_ntz ( <string_expr> [ , <format> ] )
```

The naming is from snowflake: https://docs.snowflake.com/en/sql-reference/functions/to_timestamp.html

### Why are the changes needed?

Adds a new SQL function to create a literal/column of timestamp without time zone.
It's convenient for both end-users and developers.

### Does this PR introduce _any_ user-facing change?

Yes, a new SQL function `to_timestamp_ntz`.

### How was this patch tested?

Unit tests

Closes #32995 from gengliangwang/toTimestampNtz.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-22 09:50:48 +08:00
Kousuke Saruta 2c91672259 [SPARK-35775][SQL][TESTS] Check all year-month interval types in aggregate expressions
### What changes were proposed in this pull request?

This PR adds test to check `sum` and `avg` works with all the `YearMonthInterval` types.

### Why are the changes needed?

To ensure the results of aggregations are what is expected.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #32988 from sarutak/check-interval-agg-ym.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-21 16:47:29 +03:00
tanel.kiis@gmail.com f80be4187e [SPARK-34565][SQL] Collapse Window nodes with Project between them
### What changes were proposed in this pull request?

Extend the `CollapseWindow` rule to collapse `Window` nodes, that have `Project` between them.

### Why are the changes needed?

The analyzer will turn a `dataset.withColumn("colName", expressionWithWindowFunction)` method call to a `Project - Window - Project` chain in the logical plan. When this method is called multiple times in a row, then the projects can block the `Window` nodes from being collapsed by the current `CollapseWindow` rule.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

Closes #31677 from tanelk/SPARK-34565_collapse_windows.

Lead-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Co-authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-06-21 22:10:49 +09:00
Max Gekk 37ef7bb98c [SPARK-35840][SQL] Add apply() for a single field to YearMonthIntervalType and DayTimeIntervalType
### What changes were proposed in this pull request?
In the PR, I propose to add 2 new methods that accept one field and produce either `YearMonthIntervalType` or `DayTimeIntervalType`.

### Why are the changes needed?
To improve code maintenance.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By existing test suites.

Closes #32997 from MaxGekk/ansi-interval-types-single-field.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-21 14:15:33 +03:00
Angerszhuuuu 1488ea9a8c [SPARK-35820][SQL] Support Cast between different field DayTimeIntervalType
### What changes were proposed in this pull request?
 Support Cast between different field DayTimeIntervalType

### Why are the changes needed?
Make user convenient to get different field DayTimeIntervalType

### Does this PR introduce _any_ user-facing change?
User can call cast DayTimeIntervalType(DAY, SECOND) to DayTimeIntervalType(DAY, MINUTE) etc

### How was this patch tested?
Added UT

Closes #32975 from AngersZhuuuu/SPARK-35820.

Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-21 12:36:38 +03:00
yi.wu 974d127c4f [SPARK-35545][FOLLOW-UP][TEST][SQL] Add a regression test for the SubqueryExpression refactor
### What changes were proposed in this pull request?

Add a test.

### Why are the changes needed?

The SubqueryExpression refactor PR https://github.com/apache/spark/pull/32687 actually fixes the bug of `SubqueryExpression.references`. So this follow-up PR adds a regression unit test for it.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added a new test.

Closes #32990 from Ngone51/spark-35545-followup.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-21 09:54:55 +03:00
Peter Toth 682e7f2033 [SPARK-29375][SPARK-28940][SPARK-32041][SQL] Whole plan exchange and subquery reuse
### What changes were proposed in this pull request?
This PR:
1. Fixes an issue in `ReuseExchange` rule that can result a `ReusedExchange` node pointing to an invalid exchange. This can happen due to the 2 separate traversals in `ReuseExchange` when the 2nd traversal modifies an exchange that has already been referenced (reused) in the 1st traversal.
   Consider the following query:
   ```
   WITH t AS (
     SELECT df1.id, df2.k
     FROM df1 JOIN df2 ON df1.k = df2.k
     WHERE df2.id < 2
   )
   SELECT * FROM t AS a JOIN t AS b ON a.id = b.id
   ```
   Before this PR the plan of the query was (note the `<== this reuse node points to a non-existing node` marker):
   ```
   == Physical Plan ==
   *(7) SortMergeJoin [id#14L], [id#18L], Inner
   :- *(3) Sort [id#14L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#14L, 5), true, [id=#298]
   :     +- *(2) Project [id#14L, k#17L]
   :        +- *(2) BroadcastHashJoin [k#15L], [k#17L], Inner, BuildRight
   :           :- *(2) Project [id#14L, k#15L]
   :           :  +- *(2) Filter isnotnull(id#14L)
   :           :     +- *(2) ColumnarToRow
   :           :        +- FileScan parquet default.df1[id#14L,k#15L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#15L), dynamicpruningexpression(k#15L IN dynamicpruning#26)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :           :              +- SubqueryBroadcast dynamicpruning#26, 0, [k#17L], [id=#289]
   :           :                 +- ReusedExchange [k#17L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#179]
   :           +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#179]
   :              +- *(1) Project [k#17L]
   :                 +- *(1) Filter ((isnotnull(id#16L) AND (id#16L < 2)) AND isnotnull(k#17L))
   :                    +- *(1) ColumnarToRow
   :                       +- FileScan parquet default.df2[id#16L,k#17L] Batched: true, DataFilters: [isnotnull(id#16L), (id#16L < 2), isnotnull(k#17L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
   +- *(6) Sort [id#18L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#18L, k#21L], Exchange hashpartitioning(id#14L, 5), true, [id=#184] <== this reuse node points to a non-existing node
   ```
   After this PR:
   ```
   == Physical Plan ==
   *(7) SortMergeJoin [id#14L], [id#18L], Inner
   :- *(3) Sort [id#14L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#14L, 5), true, [id=#231]
   :     +- *(2) Project [id#14L, k#17L]
   :        +- *(2) BroadcastHashJoin [k#15L], [k#17L], Inner, BuildRight
   :           :- *(2) Project [id#14L, k#15L]
   :           :  +- *(2) Filter isnotnull(id#14L)
   :           :     +- *(2) ColumnarToRow
   :           :        +- FileScan parquet default.df1[id#14L,k#15L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#15L), dynamicpruningexpression(k#15L IN dynamicpruning#26)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :           :              +- SubqueryBroadcast dynamicpruning#26, 0, [k#17L], [id=#103]
   :           :                 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#102]
   :           :                    +- *(1) Project [k#17L]
   :           :                       +- *(1) Filter ((isnotnull(id#16L) AND (id#16L < 2)) AND isnotnull(k#17L))
   :           :                          +- *(1) ColumnarToRow
   :           :                             +- FileScan parquet default.df2[id#16L,k#17L] Batched: true, DataFilters: [isnotnull(id#16L), (id#16L < 2), isnotnull(k#17L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
   :           +- ReusedExchange [k#17L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#102]
   +- *(6) Sort [id#18L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#18L, k#21L], Exchange hashpartitioning(id#14L, 5), true, [id=#231]
   ```
2. Fixes an issue with separate consecutive `ReuseExchange` and `ReuseSubquery` rules that can result a `ReusedExchange` node pointing to an invalid exchange. This can happen due to the 2 separate rules when `ReuseSubquery` rule modifies an exchange that has already been referenced (reused) in `ReuseExchange` rule.
   Consider the following query:
   ```
   WITH t AS (
     SELECT df1.id, df2.k
     FROM df1 JOIN df2 ON df1.k = df2.k
     WHERE df2.id < 2
   ),
   t2 AS (
     SELECT * FROM t
     UNION
     SELECT * FROM t
   )
   SELECT * FROM t2 AS a JOIN t2 AS b ON a.id = b.id
   ```
   Before this PR the plan of the query was (note the `<== this reuse node points to a non-existing node` marker):
   ```
   == Physical Plan ==
   *(15) SortMergeJoin [id#46L], [id#58L], Inner
   :- *(7) Sort [id#46L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#46L, 5), true, [id=#979]
   :     +- *(6) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :        +- Exchange hashpartitioning(id#46L, k#49L, 5), true, [id=#975]
   :           +- *(5) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :              +- Union
   :                 :- *(2) Project [id#46L, k#49L]
   :                 :  +- *(2) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
   :                 :     :- *(2) Project [id#46L, k#47L]
   :                 :     :  +- *(2) Filter isnotnull(id#46L)
   :                 :     :     +- *(2) ColumnarToRow
   :                 :     :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :                 :     :              +- SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#926]
   :                 :     :                 +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
   :                 :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
   :                 :        +- *(1) Project [k#49L]
   :                 :           +- *(1) Filter ((isnotnull(id#48L) AND (id#48L < 2)) AND isnotnull(k#49L))
   :                 :              +- *(1) ColumnarToRow
   :                 :                 +- FileScan parquet default.df2[id#48L,k#49L] Batched: true, DataFilters: [isnotnull(id#48L), (id#48L < 2), isnotnull(k#49L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
   :                 +- *(4) Project [id#46L, k#49L]
   :                    +- *(4) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
   :                       :- *(4) Project [id#46L, k#47L]
   :                       :  +- *(4) Filter isnotnull(id#46L)
   :                       :     +- *(4) ColumnarToRow
   :                       :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :                       :              +- ReusedSubquery SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#926]
   :                       +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
   +- *(14) Sort [id#58L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#58L, k#61L], Exchange hashpartitioning(id#46L, 5), true, [id=#761] <== this reuse node points to a non-existing node
   ```
   After this PR:
   ```
   == Physical Plan ==
   *(15) SortMergeJoin [id#46L], [id#58L], Inner
   :- *(7) Sort [id#46L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#46L, 5), true, [id=#793]
   :     +- *(6) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :        +- Exchange hashpartitioning(id#46L, k#49L, 5), true, [id=#789]
   :           +- *(5) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :              +- Union
   :                 :- *(2) Project [id#46L, k#49L]
   :                 :  +- *(2) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
   :                 :     :- *(2) Project [id#46L, k#47L]
   :                 :     :  +- *(2) Filter isnotnull(id#46L)
   :                 :     :     +- *(2) ColumnarToRow
   :                 :     :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :                 :     :              +- SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#485]
   :                 :     :                 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
   :                 :     :                    +- *(1) Project [k#49L]
   :                 :     :                       +- *(1) Filter ((isnotnull(id#48L) AND (id#48L < 2)) AND isnotnull(k#49L))
   :                 :     :                          +- *(1) ColumnarToRow
   :                 :     :                             +- FileScan parquet default.df2[id#48L,k#49L] Batched: true, DataFilters: [isnotnull(id#48L), (id#48L < 2), isnotnull(k#49L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
   :                 :     +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
   :                 +- *(4) Project [id#46L, k#49L]
   :                    +- *(4) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
   :                       :- *(4) Project [id#46L, k#47L]
   :                       :  +- *(4) Filter isnotnull(id#46L)
   :                       :     +- *(4) ColumnarToRow
   :                       :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :                       :              +- ReusedSubquery SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#485]
   :                       +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
   +- *(14) Sort [id#58L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#58L, k#61L], Exchange hashpartitioning(id#46L, 5), true, [id=#793]
   ```
   (This example contains issue 1 as well.)

3. Improves the reuse of exchanges and subqueries by enabling reuse across the whole plan. This means that the new combined rule utilizes the reuse opportunities between parent and subqueries by traversing the whole plan. The traversal is started on the top level query only.

4. Due to the order of traversal this PR does while adding reuse nodes, the reuse nodes appear in parent queries if reuse is possible between different levels of queries (typical for DPP). This is not an issue from execution perspective, but this also means "forward references" in explain formatted output where parent queries come first. The changes I made to `ExplainUtils` are to handle these references properly.

This PR fixes the above 3 issues by unifying the separate rules into a `ReuseExchangeAndSubquery` rule that does a 1 pass, whole-plan, bottom-up traversal.

### Why are the changes needed?
Performance improvement.

### How was this patch tested?
- New UTs in `ReuseExchangeAndSubquerySuite` to cover 1. and 2.
- New UTs in `DynamicPartitionPruningSuite`, `SubquerySuite` and `ExchangeSuite` to cover 3.
- New `ReuseMapSuite` to test `ReuseMap`.
- Checked new golden files of `PlanStabilitySuite`s for invalid reuse references.
- TPCDS benchmarks.

Closes #28885 from peter-toth/SPARK-29375-SPARK-28940-whole-plan-reuse.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-21 04:53:19 +00:00
Kousuke Saruta af20474c67 [SPARK-35827][SQL] Show proper error message when update column types to year-month/day-time interval
### What changes were proposed in this pull request?

This PR fixes error message shown when changing a column type to year-month/day-time interval type is attempted.

### Why are the changes needed?

It's for consistent behavior.
Updating column types to interval types are prohibited for V2 source tables.
So, if we attempt to update the type of a column to the conventional interval type, an error message like `Error in query: Cannot update <table> field <column> to interval type;`.

But, for year-month/day-time interval types, another error message like `Error in query: Cannot update <table> field <column>:<type> cannot be cast to interval year;`.

You can reproduce with the following procedure.
```
$ bin/spark-sql
spark-sql> SET spark.sql.catalog.mycatalog=<a catalog implementation class>;
spark-sql> CREATE TABLE mycatalog.t1(c1 int) USING <V2 datasource implementation class>;
spark-sql> ALTER TABLE mycatalog.t1 ALTER COLUMN c1 TYPE interval year to month;
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Modified an existing test.

Closes #32978 from sarutak/err-msg-interval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-20 23:39:46 +03:00
Kousuke Saruta 4758dc78a2 [SPARK-35771][SQL][FOLLOWUP] IntervalUtils.toYearMonthIntervalString should consider the case year-month type is casted as month type
### What changes were proposed in this pull request?

This PR fixes an issue that `IntervalUtils.toYearMonthIntervalString` doesn't consider the case that year-month interval type is casted as month interval type.
If a year-month interval data is casted as month interval, the value of the year is multiplied by `12` and added to the value of month. For example, `INTERVAL '1-2' YEAR TO MONTH` will be `INTERVAL '14' MONTH` if  it's casted.
If this behavior is intended, it's stringified to be `'INTERVAL 14' MONTH` but currently, it will be `INTERVAL '2' MONTH`

### Why are the changes needed?

It's a bug if the behavior of cast is intended.

### Does this PR introduce _any_ user-facing change?

No, because this feature is not released yet.

### How was this patch tested?

Modified the tests added in SPARK-35771 (#32924).

Closes #32982 from sarutak/fix-toYearMonthIntervalString.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-20 10:32:21 +03:00
Angerszhuuuu 86bcd1fba0 [SPARK-35819][SQL] Support Cast between different field YearMonthIntervalType
### What changes were proposed in this pull request?
 Support Cast between different field YearMonthIntervalType

### Why are the changes needed?
Make user convenient to get different field YearMonthIntervalType

### Does this PR introduce _any_ user-facing change?
User can call cast YearMonthIntervalType(YEAR, MONTH) to YearMonthIntervalType(YEAR, YEAR) etc

### How was this patch tested?
Added UT

Closes #32974 from AngersZhuuuu/SPARK-35819.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-19 21:43:06 +03:00
Angerszhuuuu 2ebad72758 [SPARK-35726][SQL] Truncate java.time.Duration by fields of day-time interval type
### What changes were proposed in this pull request?
Support truncate java.time.Duration by fields of day-time interval type.

### Why are the changes needed?
To respect fields of the target day-time interval types.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #32950 from AngersZhuuuu/SPARK-35726.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-19 13:51:21 +03:00
Liang-Chi Hsieh 882122d6b7 [SPARK-35565][SS] Add config for ignoring metadata directory of FileStreamSink
### What changes were proposed in this pull request?

This patch proposes to add an internal config for ignoring metadata of `FileStreamSink` when reading the output path.

### Why are the changes needed?

`FileStreamSink` produces a metadata directory which logs output files per micro-batch. When we read from the output path, Spark will look at the metadata and ignore other files not in the log.

Normally it works well. But for some use-cases, we may need to ignore the metadata when reading the output path. For example, when we change the streaming query and must to run it with new checkpoint directory, we cannot use previous metadata. If we create a new metadata too, when we read the output path later in Spark, Spark only reads the files listed in the new metadata. The files written before we use new checkpoint and metadata are ignored by Spark.

Although seems we can output to different output directory every time, but it is bad idea as we will produce many directories unnecessarily.

We need a config for ignoring the metadata of `FileStreamSink` when reading the output path.

### Does this PR introduce _any_ user-facing change?

Added a config for ignoring metadata of FileStreamSink when reading the output.

### How was this patch tested?

Unit tests.

Closes #32702 from viirya/ignore-metadata.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-06-19 08:20:58 +09:00
Yuming Wang 7be8d8a164 [SPARK-35185][SQL] Improve Distinct statistics estimation
### What changes were proposed in this pull request?

This PR improves `Distinct` statistics estimation by rewrite it to `Aggregate`.

### Why are the changes needed?

1. The current implementation will lack column statistics.
2. Some rules before the `ReplaceDistinctWithAggregate` may use it. For example: https://github.com/apache/spark/pull/31113/files#diff-11264d807efa58054cca2d220aae8fba644ee0f0f2a4722c46d52828394846efR1808

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #32291 from wangyum/SPARK-35185.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-06-18 21:48:44 +08:00
ulysses-you 2c4598d02e [SPARK-35608][SQL] Support AQE optimizer side transformUpWithPruning
### What changes were proposed in this pull request?

Change `AQEPropagateEmptyRelation` from `transformUp` to `transformUpWithPruning

### Why are the changes needed?

To avoid unnecessary iteration during AQE optimizer.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass CI.

Closes #32742 from ulysses-you/aqe-transformUpWithPruning.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-18 20:31:11 +08:00
Angerszhuuuu 071566caf3 [SPARK-35769][SQL] Truncate java.time.Period by fields of year-month interval type
### What changes were proposed in this pull request?
Support truncate java.time.Period by fields of year-month interval type

### Why are the changes needed?
To follow the SQL standard and respect the field restriction of the target year-month type.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #32945 from AngersZhuuuu/SPARK-35769.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-18 11:55:57 +03:00
Kousuke Saruta 45b7f76295 [SPARK-35095][SS][TESTS] Use ANSI intervals in streaming join tests
### What changes were proposed in this pull request?

This PR extends the following tests to use day-time intervals.

* StreamingOuterJoinSuite.right outer with watermark range condition
* StreamingOuterJoinSuite.left outer with watermark range condition

### Why are the changes needed?

Currently, there are no tests to use day-time intervals.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New assertions.

Closes #32953 from sarutak/stream-join-interval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-17 22:48:18 +03:00
Gengliang Wang 05e2b76852 [SPARK-35720][SQL] Support casting of String to timestamp without time zone type
### What changes were proposed in this pull request?

Extend the Cast expression and support StringType in casting to TimestampWithoutTZType.

Closes #32898

### Why are the changes needed?

To conform the ANSI SQL standard which requires to support such casting.

### Does this PR introduce _any_ user-facing change?

No, the new timestamp type is not released yet.

### How was this patch tested?

Unit test

Closes #32936 from gengliangwang/castStringToTswtz.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-18 02:02:10 +08:00
allisonwang-db 0d900b6cfa [SPARK-35789][SQL] Refine lateral join syntax to only allow subqueries
### What changes were proposed in this pull request?
This PR is a follow-up for SPARK-34382. It refines the lateral join syntax to only allow the LATERAL keyword to be in front of subqueries, instead of all `relationPriamry`. For example, `SELECT * FROM t1, LATERAL t2` should not be allowed.

### Why are the changes needed?
To be consistent with Postgres.

### Does this PR introduce _any_ user-facing change?
Yes. After this PR, the LATERAL keyword can only be in front of subqueries.

```scala
sql("SELECT * FROM t1, LATERAL t2")

org.apache.spark.sql.catalyst.parser.ParseException:
LATERAL can only be used with subquery(line 1, pos 26)

== SQL ==
select * from t1, lateral t2
--------------------------^^^
```

### How was this patch tested?
New unit tests.

Closes #32937 from allisonwang-db/spark-35789-lateral-join-parser.

Authored-by: allisonwang-db <allison.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-17 16:47:30 +00:00
gengjiaan ee2d8ae322 [SPARK-35378][SQL][FOLLOWUP] Move CommandResult to catalyst.plans.logical
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/32513 added the case class `CommandResult` in package `org.apache.spark.sql.expression`. It is not suitable, so this PR move `CommandResult` from `org.apache.spark.sql.expression` to `org.apache.spark.sql.catalyst.plans.logical`.

### Why are the changes needed?
Make `CommandResult` in suitable package.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
No need.

Closes #32942 from beliefer/SPARK-35378-followup.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-17 07:47:38 -07:00
Peter Toth abf9675a75 [SPARK-35798][SQL] Fix SparkPlan.sqlContext usage
### What changes were proposed in this pull request?
There might be `SparkPlan` nodes where canonicalization on executor side can cause issues. This is a follow-up fix to conversation https://github.com/apache/spark/pull/32885/files#r651019687.

### Why are the changes needed?
To avoid potential NPEs when canonicalization happens on executors.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing UTs.

Closes #32947 from peter-toth/SPARK-35798-fix-sparkplan.sqlcontext-usage.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-17 13:49:38 +00:00
Linhong Liu b86a69f026 [SPARK-35792][SQL] View should not capture configs used in RelationConversions
### What changes were proposed in this pull request?
`RelationConversions` is actually an optimization rule while it's executed in the analysis phase.
For view, it's designed to only capture semantic configs, so we should ignore the optimization
configs that will be used in the analysis phase.

This PR also fixes the issue that view resolution will always use the default value for uncaptured config

### Why are the changes needed?
Bugfix

### Does this PR introduce _any_ user-facing change?
Yes, after this PR view resolution will respect the values set in the current session for the below configs
```
"spark.sql.hive.convertMetastoreParquet"
"spark.sql.hive.convertMetastoreOrc"
"spark.sql.hive.convertInsertingPartitionedTable"
"spark.sql.hive.convertMetastoreCtas"
```

### How was this patch tested?
By running new UT:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *HiveSQLViewSuite"
```

Closes #32941 from linhongliu-db/SPARK-35792-ignore-convert-configs.

Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-17 21:40:53 +08:00
Angerszhuuuu 234163fbe0 [SPARK-35732][SQL] Parse DayTimeIntervalType from JSON
### What changes were proposed in this pull request?
Support Parse DayTimeIntervalType from JSON

### Why are the changes needed?
this will allow to store day-second intervals as table columns into Hive external catalog.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #32930 from AngersZhuuuu/SPARK-35732.

Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-17 12:54:34 +03:00
Wenchen Fan 0c5a01a78c [SPARK-35378][SQL][FOLLOWUP] Restore the command execution name for DataFrameWriterV2
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/32513

It's hard to keep the command execution name for `DataFrameWriter`, as the command logical plan is a bit messy (DS v1, file source and hive and different command logical plans) and sometimes it's hard to distinguish "insert" and "save".

However, `DataFrameWriterV2` only produce v2 commands which are pretty clean. It's easy to keep the command execution name for them.

### Why are the changes needed?

less breaking changes.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

Closes #32919 from cloud-fan/follow.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-17 08:55:42 +00:00
copperybean 939ae91e00 [SPARK-35130][SQL] Add make_dt_interval function to construct DayTimeIntervalType value
### What changes were proposed in this pull request?
Providing a new function make_dt_interval to construct DayTimeIntervalType value

### Why are the changes needed?
As the JIRA described, we should provide a function to construct DayTimeIntervalType value

### Does this PR introduce _any_ user-facing change?
Yes, a new make_dt_interval function provided

### How was this patch tested?
Updated UTs, manual testing

Closes #32601 from copperybean/work.

Authored-by: copperybean <copperybean.zhang@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-17 10:01:16 +03:00
Angerszhuuuu 0e554d44df [SPARK-35770][SQL] Parse YearMonthIntervalType from JSON
### What changes were proposed in this pull request?
Parse YearMonthIntervalType from JSON.

### Why are the changes needed?
This will allow to store year-month intervals as table columns into Hive external catalog.

### Does this PR introduce _any_ user-facing change?
People can store year-month interval types as json string.

### How was this patch tested?
Added UT.

Closes #32929 from AngersZhuuuu/SPARK-35770.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-17 09:51:47 +03:00
Cheng Su e0d81d9b71 [SPARK-35791][SQL] Release on-going map properly for NULL-aware ANTI join
### What changes were proposed in this pull request?

NULL-aware ANTI join (https://issues.apache.org/jira/browse/SPARK-32290) detects NULL join keys during building the map for `HashedRelation`, and will immediately return `HashedRelationWithAllNullKeys` without taking care of the map built already. Before returning `HashedRelationWithAllNullKeys`, the map needs to be freed properly to save memory and keep memory accounting correctly.

### Why are the changes needed?

Save memory and keep memory accounting correctly for the join query.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests introduced in https://github.com/apache/spark/pull/29104 .

Closes #32939 from c21/free-null-aware.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-17 13:57:35 +08:00
weixiuli 947c7ea27c [SPARK-35783][SQL] Set the list of read columns in the task configuration to reduce reading of ORC data
### What changes were proposed in this pull request?
Set the list of read columns in the task configuration to reduce reading of ORC data.
### Why are the changes needed?
Now, the ORC reader will read all columns of the ORC table when the task configuration does not set the list of read columns . Therefore, we should set the list of read columns in the task configuration to reduce reading of ORC data.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
exist unittests

Closes #32923 from weixiuli/SPARK-35783.

Authored-by: weixiuli <weixiuli@jd.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-16 22:06:31 -07:00
Venki Korukanti 8e594f084a [SPARK-35763][SS] Remove the StateStoreCustomMetric subclass enumeration dependency
### What changes were proposed in this pull request?

Remove the usage of the enumerating subclasses of `StateStoreCustomMetric` dependency.

To achieve it, add couple of utility methods to `StateStoreCustomMetric`
* `withNewDesc(desc : String)` to `StateStoreCustomMetric` for cloning the instance with a new `desc` (currently used in `SymmetricHashJoinStateManager`)
* `createSQLMetric(sparkContext: sparkContext): SQLMetric` for creating a corresponding `SQLMetric` to show the metric in UI and accumulate at the query level (currently used in `statefulOperator. stateStoreCustomMetrics`)

### Why are the changes needed?

Code in [SymmetricHashJoinStateManager](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L321) and [StateStoreWriter](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L129) rely on the subclass implementations of [StateStoreCustomMetric](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L187).

If a new subclass of `StateStoreCustomMetric` is added, it requires code changes to `SymmetricHashJoinStateManager` and `StateStoreWriter`, and we may miss the update if there is no existing test coverage.

To prevent these issues add a couple of utility methods to `StateStoreCustomMetric` as mentioned above.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing UT and a new UT

Closes #32914 from vkorukanti/SPARK-35763.

Authored-by: Venki Korukanti <venki.korukanti@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-06-17 07:48:24 +09:00
Chao Sun 506ef9aad7 [SPARK-29250][BUILD] Upgrade to Hadoop 3.3.1
### What changes were proposed in this pull request?

This upgrade default Hadoop version from 3.2.1 to 3.3.1. The changes here are simply update the version number and dependency file.

### Why are the changes needed?

Hadoop 3.3.1 just came out, which comes with many client-side improvements such as for S3A/ABFS (20% faster when accessing S3). These are important for users who want to use Spark in a cloud environment.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

- Existing unit tests in Spark
- Manually tested using my S3 bucket for event log dir:
```
bin/spark-shell \
  -c spark.hadoop.fs.s3a.access.key=$AWS_ACCESS_KEY_ID \
  -c spark.hadoop.fs.s3a.secret.key=$AWS_SECRET_ACCESS_KEY \
  -c spark.eventLog.enabled=true
  -c spark.eventLog.dir=s3a://<my-bucket>
```
- Manually tested against docker-based YARN dev cluster, by running `SparkPi`.

Closes #30135 from sunchao/SPARK-29250.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-16 13:28:07 -07:00
YangJie 87bf6b0ea4 [SPARK-35556][SQL] Remove close HiveClient's SessionState
### What changes were proposed in this pull request?

It will not generate `tmpOutputFile`, `tmpErrOutputFile` and `sessionDirs` since [SPARK-35286](https://issues.apache.org/jira/browse/SPARK-35286). So we can remove `HiveClientImpl.closeState` to avoid these exceptions:
```
java.lang.NoSuchMethodError: org.apache.hadoop.hive.ql.session.SessionState.getTmpErrOutputFile()Ljava/io/File
```

### Why are the changes needed?

1. Avoid incompatible exceptions.
2. Remove useless code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass the GitHub Action
- Manual test:

Execute

```
mvn clean install -DskipTests -pl sql/hive -am
mvn test -pl sql/hive -DwildcardSuites=org.apache.spark.sql.hive.client.VersionsSuite -Dtest=none
```

**Before**

```
Run completed in 17 minutes, 18 seconds.
Total number of tests run: 867
Suites: completed 2, aborted 0
Tests: succeeded 867, failed 0, canceled 0, ignored 1, pending 0
All tests passed.
15:04:02.407 WARN org.apache.hadoop.hive.metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
15:04:02.408 WARN org.apache.hadoop.hive.metastore.ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore yangjie010.2.30.21
15:04:02.441 WARN org.apache.hadoop.hive.metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException
15:04:03.140 ERROR org.apache.spark.util.Utils: Uncaught exception in thread shutdown-hook-0
java.lang.NoSuchMethodError: org.apache.hadoop.hive.ql.session.SessionState.getTmpErrOutputFile()Ljava/io/File;
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$closeState$1(HiveClientImpl.scala:168)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:312)
	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:243)
	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:242)
	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:292)
	at org.apache.spark.sql.hive.client.HiveClientImpl.closeState(HiveClientImpl.scala:158)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$new$1(HiveClientImpl.scala:175)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1994)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
15:04:03.141 WARN org.apache.hadoop.util.ShutdownHookManager: ShutdownHook '$anon$2' failed, java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: org.apache.hadoop.hive.ql.session.SessionState.getTmpErrOutputFile()Ljava/io/File;
java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: org.apache.hadoop.hive.ql.session.SessionState.getTmpErrOutputFile()Ljava/io/File;
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:206)
	at org.apache.hadoop.util.ShutdownHookManager.executeShutdown(ShutdownHookManager.java:124)
	at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:95)
Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.hive.ql.session.SessionState.getTmpErrOutputFile()Ljava/io/File;
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$closeState$1(HiveClientImpl.scala:168)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:312)
	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:243)
	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:242)
	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:292)
	at org.apache.spark.sql.hive.client.HiveClientImpl.closeState(HiveClientImpl.scala:158)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$new$1(HiveClientImpl.scala:175)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1994)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

**After**

```
Run completed in 11 minutes, 41 seconds.
Total number of tests run: 867
Suites: completed 2, aborted 0
Tests: succeeded 867, failed 0, canceled 0, ignored 1, pending 0
All tests passed.
```

Closes #32693 from LuciferYang/SPARK-35556.

Lead-authored-by: YangJie <yangjie01@baidu.com>
Co-authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-06-16 23:30:30 +08:00
Wenchen Fan a2961ddfdf [SPARK-35712][SQL] Simplify ResolveAggregateFunctions
### What changes were proposed in this pull request?

Currently, `ResolveAggregateFunctions` is a complicated rule that recursively calls the entire analyzer to resolve aggregate functions in parent nodes of aggregate. It's kind of necessary as we need to do many things to identify the aggregate function and push it down to the aggregate node: resolve columns as if they are in the aggregate node, resolve functions, apply type coercion, etc. However, this is overly complicated and it's hard to fully understand how the resolution is done there. It also leads to hacks such as the [char/varchar hack](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2396-L2401), [subquery hack](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2274-L2277), [grouping function hack](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2465-L2467), etc.

This PR simplifies the `ResolveAggregateFunctions` rule and clarifies the resolution logic. To resolve aggregate functions/grouping columns in HAVING, ORDER BY and `df.where`, we expand the aggregate node below to output these required aggregate functions/grouping columns. In details, when resolving an expression from the parent of an aggregate node:
1. try to resolve columns with `agg.child` and wrap the result with `TempResolvedColumn`.
2. try to resolve subqueries with `agg.child`
3. if the expression is not resolved, return it and wait for other rules to resolve it, such as resolve functions, type coercions, etc.
4. if the expression is resolved, we transform it and push aggregate functions/grouping columns into the aggregate node below.
4.1 the expression may already present in `agg.aggregateExpressions`, we can simply replace the expression with attr ref.
4.2 if a `TempResolvedColumn` is neither inside an aggregate function, or wrap a grouping column, turn it back to an `UnresolvedAttribute`
5. after the main resolution batch, remove all `TempResolvedColumn` and turn them back to `UnresolvedAttribute`.

### Why are the changes needed?

Code cleanup

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing test

Closes #32470 from cloud-fan/agg2.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-16 09:52:19 +00:00
Kousuke Saruta 184f65e7c7 [SPARK-35771][SQL] Format year-month intervals using type fields
### What changes were proposed in this pull request?

This PR proposes to format year-month interval to strings using the start and end fields of `YearMonthIntervalType`.

### Why are the changes needed?

 Currently, they are ignored, and any `YearMonthIntervalType` is formatted as `INTERVAL YEAR TO MONTH`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #32924 from sarutak/year-month-interval-format.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-16 11:08:02 +03:00
Kousuke Saruta 4530760c40 [SPARK-35774][SQL] Parse any year-month interval types in SQL
### What changes were proposed in this pull request?

This PR extends the parser rules to be able to parse the following types:

* INTERVAL YEAR
* INTERVAL YEAR TO MONTH
* INTERVAL MONTH

### Why are the changes needed?

For ANSI compliance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New assertion.

Closes #32922 from sarutak/parse-any-year-month.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-16 09:41:57 +03:00
Venkata krishnan Sowrirajan aaa8a80c9d [SPARK-35613][CORE][SQL] Cache commonly occurring strings in SQLMetrics, JSONProtocol and AccumulatorV2 classes
### What changes were proposed in this pull request?
Cache commonly occurring duplicate Some objects in SQLMetrics by using a Guava cache and reusing the existing Guava String Interner to avoid duplicate strings in JSONProtocol. Also with AccumulatorV2 we have seen lot of Some(-1L) and Some(0L) occurrences in a heap dump that is naively interned by having reusing a already constructed Some(-1L) and Some(0L)

To give some context on the impact and the garbage got accumulated, below are the details of the complex spark job which we troubleshooted and figured out the bottlenecks. **tl;dr - In short, major issues were the accumulation of duplicate objects mainly from SQLMetrics.**

Greater than 25% of the 40G driver heap filled with (a very large number of) **duplicate**, immutable objects.

1. Very large number of **duplicate** immutable objects.

- Type of metric is represented by `'scala.Some("sql")'` - which is created for each metric.
- Fixing this reduced memory usage from 4GB to a few bytes.

2. `scala.Some(0)` and `scala.Some(-1)` are very common metric values (typically to indicate absence of metric)

- Individually the values are all immutable, but spark sql was creating a new instance each time.
- Intern'ing these resulted in saving ~4.5GB for a 40G heap.

3. Using string interpolation for metric names.

- Interpolation results in creation of a new string object.
- We end up with a very large number of metric names - though the number of unique strings is miniscule.
- ~7.5 GB in the 40 GB heap : which went down to a few KB's when fixed.

### Why are the changes needed?
To reduce overall driver memory footprint which eventually reduces the Full GC pauses.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Since these are memory related optimizations, unit tests are not added. These changes are added in our internal platform which made it possible for one of the complex spark job continuously failing to succeed along with other set of optimizations.

Closes #32754 from venkata91/SPARK-35613.

Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-06-15 22:02:19 -05:00
Yuming Wang b08cf6e822 [SPARK-35203][SQL] Improve Repartition statistics estimation
### What changes were proposed in this pull request?

This PR improves `Repartition` and `RepartitionByExpr` statistics estimation using child statistics.

### Why are the changes needed?

The current implementation will missing column stat. For example:
```sql
CREATE TABLE t1 USING parquet AS SELECT id % 10 AS key FROM range(100);
ANALYZE TABLE t1 COMPUTE STATISTICS FOR ALL COLUMNS;
set spark.sql.cbo.enabled=true;
EXPLAIN COST SELECT key FROM (SELECT key FROM t1 DISTRIBUTE BY key) t GROUP BY key;
```
Before this PR:
```
== Optimized Logical Plan ==
Aggregate [key#2950L], [key#2950L], Statistics(sizeInBytes=1600.0 B)
+- RepartitionByExpression [key#2950L], Statistics(sizeInBytes=1600.0 B, rowCount=100)
   +- Relation default.t1[key#2950L] parquet, Statistics(sizeInBytes=1600.0 B, rowCount=100)
```
After this PR:
```
== Optimized Logical Plan ==
Aggregate [key#2950L], [key#2950L], Statistics(sizeInBytes=160.0 B, rowCount=10)
+- RepartitionByExpression [key#2950L], Statistics(sizeInBytes=1600.0 B, rowCount=100)
   +- Relation default.t1[key#2950L] parquet, Statistics(sizeInBytes=1600.0 B, rowCount=100)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #32309 from wangyum/SPARK-35203.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-06-16 10:20:13 +09:00
Wenchen Fan 11e96dc843 [SPARK-35669][SQL] Quote the pushed column name only when nested column predicate pushdown is enabled
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/31964

We should only quote the column name when nested column predicate pushdown is enabled, otherwise the data source side may not have the logic to parse the quoted column name and fail. This is not a problem before #31964 , as we don't quote the column name if there is no dot in the name. But #31964 changed it.

### Why are the changes needed?

fix a query failure

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new test

Closes #32807 from cloud-fan/bug.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-06-16 09:43:28 +09:00
Cheng Su 9709ee5ffd [SPARK-35760][SQL] Fix the max rows check for broadcast exchange
### What changes were proposed in this pull request?

This is to fix the maximal allowed number of rows check in `BroadcastExchangeExec`. After https://github.com/apache/spark/pull/27828, the max number of rows is calculated based on max capacity of `BytesToBytesMap` (previous value before the PR is 512000000). This calculation is not accurate as only `UnsafeHashedRelation` is using `BytesToBytesMap`. `LongHashedRelation` (used for broadcast join on key with long data type) has limit of [512000000](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L584), and `BroadcastNestedLoopJoinExec` is not depending on `HashedRelation` at all.

The change is to only specialize the max rows limit when needed. Keep other broadcast case with the previous limit - 512000000.

### Why are the changes needed?

Fix code logic and avoid unexpected behavior.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests.

Closes #32911 from c21/broadcast.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-06-16 09:36:24 +09:00
Sumeet Gajjar 864ff67746 [SPARK-35429][CORE] Remove commons-httpclient from Hadoop-3.2 profile due to EOL and CVEs
### What changes were proposed in this pull request?

Remove commons-httpclient as a direct dependency for Hadoop-3.2 profile.
Hadoop-2.7 profile distribution still has it, hadoop-client has a compile dependency on commons-httpclient, thus we cannot remove it for Hadoop-2.7 profile.
```
[INFO] +- org.apache.hadoop:hadoop-client:jar:2.7.4:compile
[INFO] |  +- org.apache.hadoop:hadoop-common:jar:2.7.4:compile
[INFO] |  |  +- commons-cli:commons-cli:jar:1.2:compile
[INFO] |  |  +- xmlenc:xmlenc:jar:0.52:compile
[INFO] |  |  +- commons-httpclient:commons-httpclient:jar:3.1:compile
```

### Why are the changes needed?

Spark is pulling in commons-httpclient as a dependency directly. commons-httpclient went EOL years ago and there are most likely CVEs not being reported against it, thus we should remove it.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

- Existing unittests
- Checked the dependency tree before and after introducing the changes

Before:
```
./build/mvn dependency:tree -Phadoop-3.2 | grep -i "commons-httpclient"
Using `mvn` from path: /usr/bin/mvn
[INFO] +- commons-httpclient:commons-httpclient:jar:3.1:compile
[INFO] |  +- commons-httpclient:commons-httpclient:jar:3.1:provided
```

After
```
./build/mvn dependency:tree | grep -i "commons-httpclient"
Using `mvn` from path: /Users/sumeet.gajjar/cloudera/upstream-spark/build/apache-maven-3.6.3/bin/mvn
```

P.S. Reopening this since [spark upgraded](463daabd5a) its `hive.version` to `2.3.9` which does not have a dependency on `commons-httpclient`.

Closes #32912 from sumeetgajjar/SPARK-35429.

Authored-by: Sumeet Gajjar <sumeetgajjar93@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-15 14:43:30 -07:00
Max Gekk 61ce8f7649 [SPARK-35680][SQL] Add fields to YearMonthIntervalType
### What changes were proposed in this pull request?
Extend `YearMonthIntervalType` to support interval fields. Valid interval field values:
- 0 (YEAR)
- 1 (MONTH)

After the changes, the following year-month interval types are supported:
1. `YearMonthIntervalType(0, 0)` or `YearMonthIntervalType(YEAR, YEAR)`
2. `YearMonthIntervalType(0, 1)` or `YearMonthIntervalType(YEAR, MONTH)`. **It is the default one**.
3. `YearMonthIntervalType(1, 1)` or `YearMonthIntervalType(MONTH, MONTH)`

Closes #32825

### Why are the changes needed?
In the current implementation, Spark supports only `interval year to month` but the SQL standard allows to specify the start and end fields. The changes will allow to follow ANSI SQL standard more precisely.

### Does this PR introduce _any_ user-facing change?
Yes but `YearMonthIntervalType` has not been released yet.

### How was this patch tested?
By existing test suites.

Closes #32909 from MaxGekk/add-fields-to-YearMonthIntervalType.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-15 23:08:12 +03:00
Andy Grove 1012967ace [SPARK-35767][SQL] Avoid executing child plan twice in CoalesceExec
### What changes were proposed in this pull request?

`CoalesceExec` needlessly calls `child.execute` twice when it could just call it once and re-use the results. This only happens when `numPartitions == 1`.

### Why are the changes needed?

It is more efficient to execute the child plan once rather than twice.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

There are no functional changes. This is just a performance optimization, so the existing tests should be sufficient to catch any regressions.

Closes #32920 from andygrove/coalesce-exec-executes-twice.

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-15 11:59:21 -07:00
Angerszhuuuu 8a02f3a413 [SPARK-35129][SQL] Construct year-month interval column from integral fields
### What changes were proposed in this pull request?
Add a  new function to support construct YearMonthIntervalType from integral fields

### Why are the changes needed?
Add a  new function to support construct YearMonthIntervalType from integral fields

### Does this PR introduce _any_ user-facing change?
Yea user can use `make_ym_interval` to construct TearMonthIntervalType from years/months integral fields

### How was this patch tested?
Added UT

Closes #32645 from AngersZhuuuu/SPARK-35129.

Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-15 19:19:41 +03:00
Gengliang Wang c382d4009b [SPARK-35766][SQL][TESTS] Break down CastSuite/AnsiCastSuite into multiple files
### What changes were proposed in this pull request?

Currently, the file CastSuite.scala becomes big: 2000 lines, 2 base classes, 4 test suites.
In my previous work of Timestamp without time zone, I planned to put new test cases in CastSuiteBase, but they were accidentally added in AnsiCastSuiteBase.

This PR is to break the file down into 3 files. It also moves the test cases about timestamp without time zone to the right base class.

### Why are the changes needed?

Make development and review easier.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests

Closes #32918 from gengliangwang/refactorCastSuite.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-16 00:17:04 +08:00
Tanel Kiis b74260f67f [SPARK-35765][SQL] Distinct aggs are not duplicate sensitive
### What changes were proposed in this pull request?

Extended `RemoveRedundantAggregates` to remove deduplicating aggregations before aggregations that ignore duplicates.

### Why are the changes needed?

Performance imporovement.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Extending existing UT

Closes #32904 from tanelk/SPARK-33122_followup2_distinct_agg.

Authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-15 22:25:04 +09:00
gengjiaan b191d720e1 [SPARK-35056][SQL] Group exception messages in execution/streaming
### What changes were proposed in this pull request?
This PR group exception messages in `sql/core/src/main/scala/org/apache/spark/sql/execution/streaming`.

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #32880 from beliefer/SPARK-35056.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-15 12:19:52 +03:00
Gengliang Wang 195090afcc [SPARK-35764][SQL] Assign pretty names to TimestampWithoutTZType
### What changes were proposed in this pull request?

In the PR, I propose to override the typeName() method in TimestampWithoutTZType, and assign it a name according to the ANSI SQL standard
![image](https://user-images.githubusercontent.com/1097932/122013859-2cf50680-cdf1-11eb-9fcd-0ec1b59fb5c0.png)

### Why are the changes needed?

To improve Spark SQL user experience, and have readable types in error messages.

### Does this PR introduce _any_ user-facing change?

No, the new timestamp type is not released yet.
### How was this patch tested?

Unit test

Closes #32915 from gengliangwang/typename.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-15 12:15:13 +03:00
Wenchen Fan a50bd8f810 [SPARK-35742][SQL] Expression.semanticEquals should be symmetrical
### What changes were proposed in this pull request?

Currently, there are some expressions that overwrite `semanticEquals`, which makes it not symmetrical. Ideally, expressions should overwrite `canonicalized` instead of `semanticEquals`.

This PR marks `semanticEquals` as final, and implement `canonicalized` for the few expressions that overwrote `semanticEquals` before.

### Why are the changes needed?

To avoid subtle bugs (I haven't found a real bug yet).

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

a new test

Closes #32885 from cloud-fan/attr.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-15 08:53:04 +00:00
Kousuke Saruta aab0c2bf66 [SPARK-35736][SPARK-35737][SQL][FOLLOWUP] Move a common logic to DayTimeIntervalType
### What changes were proposed in this pull request?

This is a followup PR for SPARK-35736(#32893) and SPARK-35737(#32892).
This PR moves a common logic to `object DayTimeIntervalType`.
That logic is like `val strToFieldIndex = DayTimeIntervalType.dayTimeFields.map(i => DayTimeIntervalType.fieldToString(i) -> (i).toMap`, a `Map` which maps each time unit to the corresponding day-time field index.

### Why are the changes needed?

That logic appeared in the change in SPARK-35736 and SPARK-35737 so it can be a common logic and it's better to avoid the similar logic scattered.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #32905 from sarutak/followup-SPARK-35736-35737.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-14 20:51:18 +03:00
Kousuke Saruta 82af318c31 [SPARK-35748][SS][SQL] Fix StreamingJoinHelper to be able to handle day-time interval
### What changes were proposed in this pull request?

This PR fixes `StreamingJoinHelper` to be able to handle day-time interval.

### Why are the changes needed?

In the current master, `StreamingJoinHelper.getStateValueWatermark` can't handle conditions which contain day-time interval literals.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New assertions added to `StreamingJoinHlelperSuite`.

Closes #32896 from sarutak/streamingjoinhelper-daytime.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-14 15:45:36 +03:00
Kousuke Saruta 439e94c171 [SPARK-35737][SQL] Parse day-time interval literals to tightest types
### What changes were proposed in this pull request?

This PR add a feature which parse day-time interval literals to tightest type.

### Why are the changes needed?

To comply with the ANSI behavior.
For example, `INTERVAL '10 20:30' DAY TO MINUTE` should be parsed as `DayTimeIntervalType(DAY, MINUTE)` but not as `DayTimeIntervalType(DAY, SECOND)`.

### Does this PR introduce _any_ user-facing change?

No because `DayTimeIntervalType` will be introduced in `3.2.0`.

### How was this patch tested?

New tests.

Closes #32892 from sarutak/tight-daytime-interval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-14 10:06:19 +03:00
Kousuke Saruta 7978fdc97b [SPARK-35736][SQL] Parse any day-time interval types in SQL
### What changes were proposed in this pull request?
This PR adda a feature which allow the parser parse any day-time interval types in SQL.

### Why are the changes needed?
To comply with ANSI standard, we additionally need to support the following types.

* INTERVAL DAY
* INTERVAL DAY TO HOUR
* INTERVAL DAY TO MINUTE
* INTERVAL HOUR
* INTERVAL HOUR TO MINUTE
* INTERVAL HOUR TO SECOND
* INTERVAL MINUTE
* INTERVAL MINUTE TO SECOND
* INTERVAL SECOND

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
New tests.

Closes #32893 from sarutak/parse-any-day-time.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-14 00:13:50 +03:00
Gengliang Wang 6272222bc0 [SPARK-35719][SQL] Support type conversion between timestamp and timestamp without time zone type
### What changes were proposed in this pull request?

1. Extend the Cast expression and support TimestampType in casting to TimestampWithoutTZType.
2. There was a mistake in casting TimestampWithoutTZType as TimestampType in https://github.com/apache/spark/pull/32864. The target value should be `sourceValue - timeZoneOffset` instead of being the same value.

### Why are the changes needed?

To conform the ANSI SQL standard which requires to support such casting.

### Does this PR introduce _any_ user-facing change?

No, the new timestamp type is not released yet.

### How was this patch tested?

Unit test

Closes #32878 from gengliangwang/timestampToTimestampWithoutTZ.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-13 18:44:24 +03:00
Haiyang Sun 0ba1d3852b [SPARK-35701][SQL] Use copy-on-write semantics for SQLConf registered configurations
### What changes were proposed in this pull request?

Using copy-on-write for `SQLConf.sqlConfEntries` and `SQLConf.staticConfKeys` to reduce contention in concurrent workloads.

### Why are the changes needed?

The global locks used to protect `SQLConf.sqlConfEntries` map and the `SQLConf.staticConfKeys` set can cause significant contention on the `SQLConf` instance in a concurrent setting.

Using copy-on-write versions should reduce the contention given that modifications to the configs are relatively rare.

Closes #32865 from haiyangsun-db/SPARK-35701.

Authored-by: Haiyang Sun <haiyang.sun@databricks.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-06-12 14:59:48 -07:00
Kousuke Saruta 80f7989d9a [SPARK-35734][SQL] Format day-time intervals using type fields
### What changes were proposed in this pull request?

This PR add a feature which formats day-time interval to strings using the start and end fields of `DayTimeIntervalType`.

### Why are the changes needed?

Currently, they are ignored, and any `DayTimeIntervalType` is formatted as `INTERVAL DAY TO SECOND.`

### Does this PR introduce _any_ user-facing change?

Yes. The format of day-time intervals is determined the start and end fields.

### How was this patch tested?

New test.

Closes #32891 from sarutak/interval-format.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-12 21:45:12 +03:00
Chao Sun 9c7250fa73 [SPARK-35321][SQL] Don't register Hive permanent functions when creating Hive client
### What changes were proposed in this pull request?

Instantiate a new Hive client through `Hive.getWithoutRegisterFns(conf, false)` instead of `Hive.get(conf)`, if `Hive` version is >= '2.3.9' (the built-in version).

### Why are the changes needed?

[HIVE-10319](https://issues.apache.org/jira/browse/HIVE-10319) introduced a new API `get_all_functions` which is only supported in Hive 1.3.0/2.0.0 and up. As result, when Spark 3.x talks to a HMS service of version 1.2 or lower, the following error will occur:
```
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.thrift.TApplicationException: Invalid method name: 'get_all_functions'
        at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3897)
        at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:248)
        at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:231)
        ... 96 more
Caused by: org.apache.thrift.TApplicationException: Invalid method name: 'get_all_functions'
        at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_all_functions(ThriftHiveMetastore.java:3845)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_all_functions(ThriftHiveMetastore.java:3833)
```

The `get_all_functions` is called only when `doRegisterAllFns` is set to true:
```java
  private Hive(HiveConf c, boolean doRegisterAllFns) throws HiveException {
    conf = c;
    if (doRegisterAllFns) {
      registerAllFunctionsOnce();
    }
  }
```

what this does is to register all Hive permanent functions defined in HMS in Hive's `FunctionRegistry` class, via iterating through results from `get_all_functions`. To Spark, this seems unnecessary as it loads Hive permanent (not built-in) UDF via directly calling the HMS API, i.e., `get_function`. The `FunctionRegistry` is only used in loading Hive's built-in function that is not supported by Spark. At this time, it only applies to `histogram_numeric`.

[HIVE-21563](https://issues.apache.org/jira/browse/HIVE-21563) introduced a new API `getWithoutRegisterFns` which skips the above registration and is available in Hive 2.3.9. Therefore, Spark should adopt it to avoid the cost.

### Does this PR introduce _any_ user-facing change?

Yes with this fix Spark now should be able to talk to HMS server with Hive 1.2.x and lower.

### How was this patch tested?

Manually started a HMS server of Hive version 1.2.2. Without the PR it failed with the above exception. With the PR the error disappeared and I can successfully perform common operations such as create table, create database, list tables, etc.

Closes #32887 from sunchao/SPARK-35321-new.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-06-12 10:32:30 +08:00
Liang-Chi Hsieh 703376e8a9 [SPARK-35689][SS] Add log warn when keyWithIndexToValue returns null value
### What changes were proposed in this pull request?

This patch adds log warn when `keyWithIndexToValue` returns null value in `SymmetricHashJoinStateManager`.

### Why are the changes needed?

Once we get null from state store in SymmetricHashJoinStateManager, it is better to add meaningful logging for the case. It is better for debugging.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #32828 from viirya/fix-ss-joinstatemanager-followup.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-06-12 10:17:09 +09:00
Chendi Xue e958833c72 [SPARK-35396][SQL][TESTS][FOLLOWUP] Add a UT to check if a user-defined cachedBatch is completely released
### What changes were proposed in this pull request?
This PR is used to do add a UT to check if user-defined cached batch are completely released when clearCache called.

### Why are the changes needed?
Add a new UT file RefCountedTestCachedBatchSerializerSuite.scala

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT is added, org.apache.spark.sql.execution.columnar.RefCountedTestCachedBatchSerializerSuite

Closes #32717 from xuechendi/support_manual_close_in_InMemoryRelation.

Authored-by: Chendi Xue <chendi.xue@intel.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-11 08:20:26 -07:00
Max Gekk d53831ff5c [SPARK-35704][SQL] Add fields to DayTimeIntervalType
### What changes were proposed in this pull request?
Extend DayTimeIntervalType to support interval fields. Valid interval field values:
- 0 (DAY)
- 1 (HOUR)
- 2 (MINUTE)
- 3 (SECOND)

After the changes, the following day-time interval types are supported:
1. `DayTimeIntervalType(0, 0)` or `DayTimeIntervalType(DAY, DAY)`
2. `DayTimeIntervalType(0, 1)` or `DayTimeIntervalType(DAY, HOUR)`
3. `DayTimeIntervalType(0, 2)` or `DayTimeIntervalType(DAY, MINUTE)`
4. `DayTimeIntervalType(0, 3)` or `DayTimeIntervalType(DAY, SECOND)`. **It is the default one**. The second fraction precision is microseconds.
5. `DayTimeIntervalType(1, 1)` or `DayTimeIntervalType(HOUR, HOUR)`
6. `DayTimeIntervalType(1, 2)` or `DayTimeIntervalType(HOUR, MINUTE)`
7. `DayTimeIntervalType(1, 3)` or `DayTimeIntervalType(HOUR, SECOND)`
8. `DayTimeIntervalType(2, 2)` or `DayTimeIntervalType(MINUTE, MINUTE)`
9. `DayTimeIntervalType(2, 3)` or `DayTimeIntervalType(MINUTE, SECOND)`
10. `DayTimeIntervalType(3, 3)` or `DayTimeIntervalType(SECOND, SECOND)`

### Why are the changes needed?
In the current implementation, Spark supports only `interval day to second` but the SQL standard allows to specify the start and end fields. The changes will allow to follow ANSI SQL standard more precisely.

### Does this PR introduce _any_ user-facing change?
Yes but `DayTimeIntervalType` has not been released yet.

### How was this patch tested?
By existing test suites.

Closes #32849 from MaxGekk/day-time-interval-type-units.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-11 16:16:33 +03:00
Tanel Kiis 692dc66c4a [SPARK-35695][SQL] Collect observed metrics from cached and adaptive execution sub-trees
### What changes were proposed in this pull request?

Collect observed metrics from cached and adaptive execution sub-trees.

### Why are the changes needed?

Currently persisting/caching will hide all observed metrics in that sub-tree from reaching the `QueryExecutionListeners`. Adaptive query execution can also hide the metrics from reaching `QueryExecutionListeners`.

### Does this PR introduce _any_ user-facing change?

Bugfix

### How was this patch tested?

New UTs

Closes #32862 from tanelk/SPARK-35695_collect_metrics_persist.

Lead-authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Co-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-11 21:03:08 +08:00
RoryQi 57ce64c511 [SPARK-35706][SQL] Consider making the ':' in STRUCT data type definition optional
### What changes were proposed in this pull request?

The STRUCT type syntax is defined like this:

STRUCT(fieldNmae: fileType [NOT NULL][COMMENT stringLiteral][,.....])

So the field list is nearly the same as a column list

if we could make ':' optional it would be so much cleaner an less proprietary

### Why are the changes needed?
ease of use

### Does this PR introduce _any_ user-facing change?
Yes, you can use Struct type list is nearly the same as a column list

### How was this patch tested?
unit tests

Closes #32858 from jerqi/master.

Authored-by: RoryQi <1242949407@qq.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-11 12:58:32 +00:00
dgd-contributor 6e1aa15679 [SPARK-35652][SQL] joinWith on two table generated from same one
### What changes were proposed in this pull request?
It seems like spark inner join is performing a cartesian join in self joining using `joinWith`

To produce this issues:
```
val df = spark.range(0,3)
df.joinWith(df, df("id") === df("id")).show()
```

Before this pull request, the result is
+---+---+
 | _1 |  _2 |
+---+---+
|    0 |   0 |
|    0 |   1 |
|    0 |   2 |
|    1 |   0 |
|    1 |   1 |
|    1 |   2 |
|    2 |   0 |
|    2 |   1 |
|    2 |   2 |
+---+---+

The expected result is
+---+---+
 | _1 |  _2 |
+---+---+
|    0 |   0 |
|    1 |   1 |
|    2 |   2 |
+---+---+
### Why are the changes needed?
correctness

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
add test

Closes #32863 from dgd-contributor/SPARK-35652_join_and_joinWith_in_seft_joining.

Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-11 20:36:50 +08:00
Liang-Chi Hsieh c463472e85 [SPARK-35439][SQL][FOLLOWUP] ExpressionContainmentOrdering should not sort unrelated expressions
### What changes were proposed in this pull request?

This is a followup of #32586. We introduced `ExpressionContainmentOrdering` to sort common expressions according to their parent-child relations. For unrelated expressions, previously the ordering returns -1 which is not correct and can possibly lead to transitivity issue.

### Why are the changes needed?

To fix the possible transitivity issue of `ExpressionContainmentOrdering`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test.

Closes #32870 from viirya/SPARK-35439-followup.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-06-11 16:13:46 +09:00
Gengliang Wang e9af4576d5 [SPARK-35718][SQL] Support casting of Date to timestamp without time zone type
### What changes were proposed in this pull request?

Extend the Cast expression and support DateType in casting to TimestampWithoutTZType.

### Why are the changes needed?

To conform the ANSI SQL standard which requires to support such casting.

### Does this PR introduce _any_ user-facing change?

No, the new timestamp type is not released yet.

### How was this patch tested?

Unit test

Closes #32873 from gengliangwang/dateToTswtz.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-11 05:41:28 +00:00
Chao Sun e9ccf4a50c [SPARK-35640][SQL] Refactor Parquet vectorized reader to remove duplicated code paths
### What changes were proposed in this pull request?

1. Remove duplicated code in the form of `readXXX` in `VectorizedRleValuesReader`. For instance:
```java
  public void readIntegers(
      int total,
      WritableColumnVector c,
      int rowId,
      int level,
      VectorizedValuesReader data) throws IOException {
    int left = total;
    while (left > 0) {
      if (this.currentCount == 0) this.readNextGroup();
      int n = Math.min(left, this.currentCount);
      switch (mode) {
        case RLE:
          if (currentValue == level) {
            data.readIntegers(n, c, rowId);
          } else {
            c.putNulls(rowId, n);
          }
          break;
        case PACKED:
          for (int i = 0; i < n; ++i) {
            if (currentBuffer[currentBufferIdx++] == level) {
              c.putInt(rowId + i, data.readInteger());
            } else {
              c.putNull(rowId + i);
            }
          }
          break;
      }
      rowId += n;
      left -= n;
      currentCount -= n;
    }
  }
```
and replace with:
```java
  public void readBatch(
       int total,
       int offset,
       WritableColumnVector values,
       int maxDefinitionLevel,
       VectorizedValuesReader valueReader,
       ParquetVectorUpdater updater) throws IOException {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
       int n = Math.min(left, this.currentCount);
       switch (mode) {
         case RLE:
           if (currentValue == maxDefinitionLevel) {
             updater.updateBatch(n, offset, values, valueReader);
           } else {
             values.putNulls(offset, n);
           }
           break;
         case PACKED:
           for (int i = 0; i < n; ++i) {
             if (currentBuffer[currentBufferIdx++] == maxDefinitionLevel) {
               updater.update(offset + i, values, valueReader);
             } else {
               values.putNull(offset + i);
             }
           }
           break;
       }
       offset += n;
       left -= n;
       currentCount -= n;
     }
   }
```
where the `ParquetVectorUpdater` is type specific, and has different implementations under `updateBatch` and `update`. Together, this also changes code paths handling timestamp types to use the batch read API for decoding definition levels.

2. Similar to the above, this removes code duplication in `VectorizedColumnReader.decodeDictionaryIds`. Now different implementations are under `ParquetVectorUpdater.decodeSingleDictionaryId`.

### Why are the changes needed?

`VectorizedRleValuesReader` and `VectorizedColumnReader` are becoming increasingly harder to maintain, as any change touches the above logic **will need to be replicated in 20+ places**. The issue becomes even more serious when we are going to implement column index (for instance, see how the change [here](https://github.com/apache/spark/pull/32753/files#diff-a01e174e178366aadf07f64ee690d47d343b2ca416a4a2b2ea735887c22d5934R191) has to be replicated multiple times) and complex type support (in progress) for the vectorized path.

In addition, currently dictionary decoding (see `VectorizedColumnReader.decodeDictionaryIds`) and non-dictionary decoding are handled separately, and therefore the same (very complicated) branching logic based on input Spark & Parquet types have to be replicated in two places, which is another burden for code maintenance.

The original intention is for performance. However these days JIT compilers tend to be very effective on this and will inline virtual calls aggressively to eliminate the method invocation costs (see [this](https://shipilev.net/blog/2015/black-magic-method-dispatch/) and [this](http://insightfullogic.com/blog/2014/may/12/fast-and-megamorphic-what-influences-method-invoca/)). I've also done benchmarks using a modified `DataSourceReadBenchmark` and `DateTimeRebaseBenchmark` and the result is almost exact the same before and after the change. The results can be found [here](https://gist.github.com/sunchao/674afbf942ccc2370bdcfa33efb4471c), and [here's](https://github.com/sunchao/spark/tree/parquet-refactor) the source code.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #32777 from sunchao/SPARK-35640.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2021-06-11 05:39:43 +00:00
Yuming Wang 463daabd5a [SPARK-34512][BUILD][SQL] Upgrade built-in Hive to 2.3.9
### What changes were proposed in this pull request?

This pr upgrades built-in Hive to 2.3.9. Hive 2.3.9 changes:
- [HIVE-17155] - findConfFile() in HiveConf.java has some issues with the conf path
- [HIVE-24797] - Disable validate default values when parsing Avro schemas
- [HIVE-24608] - Switch back to get_table in HMS client for Hive 2.3.x
- [HIVE-21200] - Vectorization: date column throwing java.lang.UnsupportedOperationException for parquet
- [HIVE-21563] - Improve Table#getEmptyTable performance by disabling registerAllFunctionsOnce
- [HIVE-19228] - Remove commons-httpclient 3.x usage

### Why are the changes needed?

Fix regression caused by AVRO-2035.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #32750 from wangyum/SPARK-34512.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-10 20:44:35 -07:00
Gengliang Wang d21ff1318f [SPARK-35716][SQL] Support casting of timestamp without time zone to date type
### What changes were proposed in this pull request?

Extend the Cast expression and support TimestampWithoutTZType in casting to DateType.

### Why are the changes needed?

To conform the ANSI SQL standard which requires to support such casting.

### Does this PR introduce _any_ user-facing change?

No, the new timestamp type is not released yet.

### How was this patch tested?

Unit test

Closes #32869 from gengliangwang/castToDate.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-10 23:37:02 +03:00
Kousuke Saruta 44b695fbb0 [SPARK-35296][SQL] Allow Dataset.observe to work even if CollectMetricsExec in a task handles multiple partitions
### What changes were proposed in this pull request?

This PR fixes an issue that `Dataset.observe` doesn't work if `CollectMetricsExec` in a task handles multiple partitions.
If `coalesce` follows `observe` and the number of partitions shrinks after `coalesce`, `CollectMetricsExec` can handle multiple partitions in a task.

### Why are the changes needed?

The current implementation of `CollectMetricsExec` doesn't consider the case it can handle multiple partitions.
Because new `updater` is created for each partition even though those partitions belong to the same task, `collector.setState(updater)` raise an assertion error.
This is a simple reproducible example.
```
$ bin/spark-shell --master "local[1]"
scala> spark.range(1, 4, 1, 3).observe("my_event", count($"id").as("count_val")).coalesce(2).collect
```
```
java.lang.AssertionError: assertion failed
	at scala.Predef$.assert(Predef.scala:208)
	at org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
	at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
	at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
	at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125)
	at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
	at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
	at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
	at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #32786 from sarutak/fix-collectmetricsexec.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-11 01:20:35 +08:00
Emil Ejbyfeldt e2e3fe7782 [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map with case classes as keys or values
### What changes were proposed in this pull request?
Use the key/value LambdaFunction to convert the elements instead of
using CatalystTypeConverters.createToScalaConverter. This is how it is
done in MapObjects and that correctly handles Arrays with case classes.

### Why are the changes needed?
Before these changes the added test cases would fail with the following:
```
[info] - encode/decode for map with case class as value: Map(1 -> IntAndString(1,a)) (interpreted path) *** FAILED *** (64 milliseconds)
[info]   Encoded/Decoded data does not match input data
[info]
[info]   in:  Map(1 -> IntAndString(1,a))
[info]   out: Map(1 -> [1,a])
[info]   types: scala.collection.immutable.Map$Map1 [info]
[info]   Encoded Data: [org.apache.spark.sql.catalyst.expressions.UnsafeMapData5ecf5d9e]
[info]   Schema: value#823
[info]   root
[info]   -- value: map (nullable = true)
[info]       |-- key: integer
[info]       |-- value: struct (valueContainsNull = true)
[info]       |    |-- i: integer (nullable = false)
[info]       |    |-- s: string (nullable = true)
[info]
[info]
[info]   fromRow Expressions:
[info]   catalysttoexternalmap(lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178), lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178), lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179), if (isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) null else newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString), input[0, map<int,struct<i:int,s:string>>, true], interface scala.collection.immutable.Map
[info]   :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178)
[info]   :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178)
[info]   :- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   :- if (isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) null else newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString)
[info]   :  :- isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))
[info]   :  :  +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   :  :- null
[info]   :  +- newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString)
[info]   :     :- assertnotnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i)
[info]   :     :  +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i
[info]   :     :     +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   :     +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s.toString
[info]   :        +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s
[info]   :           +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   +- input[0, map<int,struct<i:int,s:string>>, true] (ExpressionEncoderSuite.scala:627)
```
So using a map with cases classes for keys or values and using the interpreted path would incorrect deserialize data from the catalyst representation.

### Does this PR introduce _any_ user-facing change?
Yes, it fixes the bug.

### How was this patch tested?
Existing and new unit tests in the ExpressionEncoderSuite

Closes #32783 from eejbyfeldt/fix-interpreted-path-for-map-with-case-classes.

Authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-06-10 09:37:27 -07:00
Gengliang Wang 4180692135 [SPARK-35711][SQL] Support casting of timestamp without time zone to timestamp type
### What changes were proposed in this pull request?

Extend the Cast expression and support TimestampWithoutTZType in casting to TimestampType.

### Why are the changes needed?

To conform the ANSI SQL standard which requires to support such casting.

### Does this PR introduce _any_ user-facing change?

No, the new timestamp type is not released yet.

### How was this patch tested?

Unit test

Closes #32864 from gengliangwang/castToTimestamp.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-10 23:03:52 +08:00
Terry Kim 88f1d82a46 [SPARK-34524][SQL][FOLLOWUP] Remove unused checkAlterTablePartition in CheckAnalysis.scala
### What changes were proposed in this pull request?

#31637 removed the usage of `CheckAnalysis.checkAlterTablePartition` but didn't remove the function.

### Why are the changes needed?

To removed an unused function.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #32855 from imback82/SPARK-34524-followup.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-10 12:43:09 +00:00
Fu Chen 5280f02747 [SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in subquery
### What changes were proposed in this pull request?

Use `UnresolvedHint.resolved = child.resolved` instead `UnresolvedHint.resolved = false`, then the plan contains `UnresolvedHint` child can be optimized by rule in batch `Resolution`.

For instance, before this pr, the following plan can't be optimized by `ResolveReferences`.
```
!'Project [*]
 +- SubqueryAlias __auto_generated_subquery_name
    +- UnresolvedHint use_hash
       +- Project [42 AS 42#10]
          +- OneRowRelation
```

### Why are the changes needed?

fix hint in subquery bug

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #32841 from cfmcgrady/SPARK-35673.

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-10 15:32:10 +08:00
dgd-contributor aa3de40773 [SPARK-35679][SQL] instantToMicros overflow
### Why are the changes needed?
With Long.minValue cast to an instant, secs will be floored in function microsToInstant and cause overflow when multiply with Micros_per_second

```
def microsToInstant(micros: Long): Instant = {
  val secs = Math.floorDiv(micros, MICROS_PER_SECOND)
  // Unfolded Math.floorMod(us, MICROS_PER_SECOND) to reuse the result of
  // the above calculation of `secs` via `floorDiv`.
  val mos = micros - secs * MICROS_PER_SECOND  <- it will overflow here
  Instant.ofEpochSecond(secs, mos * NANOS_PER_MICROS)
}
```

But the overflow is acceptable because it won't produce any change to the result

However, when convert the instant back to micro value, it will raise Overflow Error

```
def instantToMicros(instant: Instant): Long = {
  val us = Math.multiplyExact(instant.getEpochSecond, MICROS_PER_SECOND) <- It overflow here
  val result = Math.addExact(us, NANOSECONDS.toMicros(instant.getNano))
  result
}
```

Code to reproduce this error
```
instantToMicros(microToInstant(Long.MinValue))
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Test added

Closes #32839 from dgd-contributor/SPARK-35679_instantToMicro.

Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-10 08:08:51 +03:00
ulysses-you 8dde20a993 [SPARK-35675][SQL] EnsureRequirements remove shuffle should respect PartitioningCollection
### What changes were proposed in this pull request?

Add `PartitioningCollection` in EnsureRequirements during remove shuffle.

### Why are the changes needed?

Currently `EnsureRequirements` only check if child has semantic equal `HashPartitioning` and remove
redundant shuffle. We can enhance this case using `PartitioningCollection`.

### Does this PR introduce _any_ user-facing change?

Yes, plan might be changed.

### How was this patch tested?

Add test.

Closes #32815 from ulysses-you/shuffle-node.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
2021-06-10 13:03:47 +08:00
Linhong Liu 87d2ffbbcf [MINOR][SQL] No need to normolize name for built-in functions
### What changes were proposed in this pull request?
Add an `internalRegisterFunction` for the built-in function registry. So that
we can skip the unnecessary function normalization.

### Why are the changes needed?
small refactor

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing ut

Closes #32842 from linhongliu-db/function-refactor.

Lead-authored-by: Linhong Liu <linhong.liu@databricks.com>
Co-authored-by: Linhong Liu <67896261+linhongliu-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-10 04:35:26 +00:00
Kousuke Saruta 7e99b65295 [SPARK-35194][SQL][FOLLOWUP] Change Seq to collections.Seq in NestedColumnAliasing to work with Scala 2.13
### What changes were proposed in this pull request?

This PR changes an occurrence of `Seq` to `collections.Seq` in `NestedColumnAliasing`.

### Why are the changes needed?

In the current master, `NestedColumnAliasing` doesn't work with Scala 2.13 and the relevant tests fail.
The following are examples.

* `NestedColumnAliasingSuite`
* Subclasses of `SchemaPruningSuite`
* `ColumnPruningSuite`

```
NestedColumnAliasingSuite:
[info] - Pushing a single nested field projection *** FAILED *** (14 milliseconds)
[info]   scala.MatchError: (none#211451,ArrayBuffer(name#211451.middle)) (of class scala.Tuple2)
[info]   at org.apache.spark.sql.catalyst.optimizer.NestedColumnAliasing$.$anonfun$getAttributeToExtractValues$5(NestedColumnAliasing.scala:258)
[info]   at scala.collection.StrictOptimizedMapOps.flatMap(StrictOptimizedMapOps.scala:31)
[info]   at scala.collection.StrictOptimizedMapOps.flatMap$(StrictOptimizedMapOps.scala:30)
[info]   at scala.collection.immutable.HashMap.flatMap(HashMap.scala:39)
[info]   at org.apache.spark.sql.catalyst.optimizer.NestedColumnAliasing$.getAttributeToExtractValues(NestedColumnAliasing.scala:258)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Ran tests mentioned above and all passed with Scala 2.13.

Closes #32848 from sarutak/followup-SPARK-35194-2.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-10 02:14:40 +00:00