Commit graph

11346 commits

Author SHA1 Message Date
Chao Sun 506ef9aad7 [SPARK-29250][BUILD] Upgrade to Hadoop 3.3.1
### What changes were proposed in this pull request?

This upgrade default Hadoop version from 3.2.1 to 3.3.1. The changes here are simply update the version number and dependency file.

### Why are the changes needed?

Hadoop 3.3.1 just came out, which comes with many client-side improvements such as for S3A/ABFS (20% faster when accessing S3). These are important for users who want to use Spark in a cloud environment.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

- Existing unit tests in Spark
- Manually tested using my S3 bucket for event log dir:
```
bin/spark-shell \
  -c spark.hadoop.fs.s3a.access.key=$AWS_ACCESS_KEY_ID \
  -c spark.hadoop.fs.s3a.secret.key=$AWS_SECRET_ACCESS_KEY \
  -c spark.eventLog.enabled=true
  -c spark.eventLog.dir=s3a://<my-bucket>
```
- Manually tested against docker-based YARN dev cluster, by running `SparkPi`.

Closes #30135 from sunchao/SPARK-29250.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-16 13:28:07 -07:00
YangJie 87bf6b0ea4 [SPARK-35556][SQL] Remove close HiveClient's SessionState
### What changes were proposed in this pull request?

It will not generate `tmpOutputFile`, `tmpErrOutputFile` and `sessionDirs` since [SPARK-35286](https://issues.apache.org/jira/browse/SPARK-35286). So we can remove `HiveClientImpl.closeState` to avoid these exceptions:
```
java.lang.NoSuchMethodError: org.apache.hadoop.hive.ql.session.SessionState.getTmpErrOutputFile()Ljava/io/File
```

### Why are the changes needed?

1. Avoid incompatible exceptions.
2. Remove useless code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass the GitHub Action
- Manual test:

Execute

```
mvn clean install -DskipTests -pl sql/hive -am
mvn test -pl sql/hive -DwildcardSuites=org.apache.spark.sql.hive.client.VersionsSuite -Dtest=none
```

**Before**

```
Run completed in 17 minutes, 18 seconds.
Total number of tests run: 867
Suites: completed 2, aborted 0
Tests: succeeded 867, failed 0, canceled 0, ignored 1, pending 0
All tests passed.
15:04:02.407 WARN org.apache.hadoop.hive.metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
15:04:02.408 WARN org.apache.hadoop.hive.metastore.ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore yangjie010.2.30.21
15:04:02.441 WARN org.apache.hadoop.hive.metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException
15:04:03.140 ERROR org.apache.spark.util.Utils: Uncaught exception in thread shutdown-hook-0
java.lang.NoSuchMethodError: org.apache.hadoop.hive.ql.session.SessionState.getTmpErrOutputFile()Ljava/io/File;
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$closeState$1(HiveClientImpl.scala:168)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:312)
	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:243)
	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:242)
	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:292)
	at org.apache.spark.sql.hive.client.HiveClientImpl.closeState(HiveClientImpl.scala:158)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$new$1(HiveClientImpl.scala:175)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1994)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
15:04:03.141 WARN org.apache.hadoop.util.ShutdownHookManager: ShutdownHook '$anon$2' failed, java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: org.apache.hadoop.hive.ql.session.SessionState.getTmpErrOutputFile()Ljava/io/File;
java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: org.apache.hadoop.hive.ql.session.SessionState.getTmpErrOutputFile()Ljava/io/File;
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:206)
	at org.apache.hadoop.util.ShutdownHookManager.executeShutdown(ShutdownHookManager.java:124)
	at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:95)
Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.hive.ql.session.SessionState.getTmpErrOutputFile()Ljava/io/File;
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$closeState$1(HiveClientImpl.scala:168)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:312)
	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:243)
	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:242)
	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:292)
	at org.apache.spark.sql.hive.client.HiveClientImpl.closeState(HiveClientImpl.scala:158)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$new$1(HiveClientImpl.scala:175)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1994)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

**After**

```
Run completed in 11 minutes, 41 seconds.
Total number of tests run: 867
Suites: completed 2, aborted 0
Tests: succeeded 867, failed 0, canceled 0, ignored 1, pending 0
All tests passed.
```

Closes #32693 from LuciferYang/SPARK-35556.

Lead-authored-by: YangJie <yangjie01@baidu.com>
Co-authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-06-16 23:30:30 +08:00
Wenchen Fan a2961ddfdf [SPARK-35712][SQL] Simplify ResolveAggregateFunctions
### What changes were proposed in this pull request?

Currently, `ResolveAggregateFunctions` is a complicated rule that recursively calls the entire analyzer to resolve aggregate functions in parent nodes of aggregate. It's kind of necessary as we need to do many things to identify the aggregate function and push it down to the aggregate node: resolve columns as if they are in the aggregate node, resolve functions, apply type coercion, etc. However, this is overly complicated and it's hard to fully understand how the resolution is done there. It also leads to hacks such as the [char/varchar hack](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2396-L2401), [subquery hack](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2274-L2277), [grouping function hack](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2465-L2467), etc.

This PR simplifies the `ResolveAggregateFunctions` rule and clarifies the resolution logic. To resolve aggregate functions/grouping columns in HAVING, ORDER BY and `df.where`, we expand the aggregate node below to output these required aggregate functions/grouping columns. In details, when resolving an expression from the parent of an aggregate node:
1. try to resolve columns with `agg.child` and wrap the result with `TempResolvedColumn`.
2. try to resolve subqueries with `agg.child`
3. if the expression is not resolved, return it and wait for other rules to resolve it, such as resolve functions, type coercions, etc.
4. if the expression is resolved, we transform it and push aggregate functions/grouping columns into the aggregate node below.
4.1 the expression may already present in `agg.aggregateExpressions`, we can simply replace the expression with attr ref.
4.2 if a `TempResolvedColumn` is neither inside an aggregate function, or wrap a grouping column, turn it back to an `UnresolvedAttribute`
5. after the main resolution batch, remove all `TempResolvedColumn` and turn them back to `UnresolvedAttribute`.

### Why are the changes needed?

Code cleanup

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing test

Closes #32470 from cloud-fan/agg2.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-16 09:52:19 +00:00
Kousuke Saruta 184f65e7c7 [SPARK-35771][SQL] Format year-month intervals using type fields
### What changes were proposed in this pull request?

This PR proposes to format year-month interval to strings using the start and end fields of `YearMonthIntervalType`.

### Why are the changes needed?

 Currently, they are ignored, and any `YearMonthIntervalType` is formatted as `INTERVAL YEAR TO MONTH`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #32924 from sarutak/year-month-interval-format.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-16 11:08:02 +03:00
Kousuke Saruta 4530760c40 [SPARK-35774][SQL] Parse any year-month interval types in SQL
### What changes were proposed in this pull request?

This PR extends the parser rules to be able to parse the following types:

* INTERVAL YEAR
* INTERVAL YEAR TO MONTH
* INTERVAL MONTH

### Why are the changes needed?

For ANSI compliance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New assertion.

Closes #32922 from sarutak/parse-any-year-month.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-16 09:41:57 +03:00
Venkata krishnan Sowrirajan aaa8a80c9d [SPARK-35613][CORE][SQL] Cache commonly occurring strings in SQLMetrics, JSONProtocol and AccumulatorV2 classes
### What changes were proposed in this pull request?
Cache commonly occurring duplicate Some objects in SQLMetrics by using a Guava cache and reusing the existing Guava String Interner to avoid duplicate strings in JSONProtocol. Also with AccumulatorV2 we have seen lot of Some(-1L) and Some(0L) occurrences in a heap dump that is naively interned by having reusing a already constructed Some(-1L) and Some(0L)

To give some context on the impact and the garbage got accumulated, below are the details of the complex spark job which we troubleshooted and figured out the bottlenecks. **tl;dr - In short, major issues were the accumulation of duplicate objects mainly from SQLMetrics.**

Greater than 25% of the 40G driver heap filled with (a very large number of) **duplicate**, immutable objects.

1. Very large number of **duplicate** immutable objects.

- Type of metric is represented by `'scala.Some("sql")'` - which is created for each metric.
- Fixing this reduced memory usage from 4GB to a few bytes.

2. `scala.Some(0)` and `scala.Some(-1)` are very common metric values (typically to indicate absence of metric)

- Individually the values are all immutable, but spark sql was creating a new instance each time.
- Intern'ing these resulted in saving ~4.5GB for a 40G heap.

3. Using string interpolation for metric names.

- Interpolation results in creation of a new string object.
- We end up with a very large number of metric names - though the number of unique strings is miniscule.
- ~7.5 GB in the 40 GB heap : which went down to a few KB's when fixed.

### Why are the changes needed?
To reduce overall driver memory footprint which eventually reduces the Full GC pauses.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Since these are memory related optimizations, unit tests are not added. These changes are added in our internal platform which made it possible for one of the complex spark job continuously failing to succeed along with other set of optimizations.

Closes #32754 from venkata91/SPARK-35613.

Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-06-15 22:02:19 -05:00
Yuming Wang b08cf6e822 [SPARK-35203][SQL] Improve Repartition statistics estimation
### What changes were proposed in this pull request?

This PR improves `Repartition` and `RepartitionByExpr` statistics estimation using child statistics.

### Why are the changes needed?

The current implementation will missing column stat. For example:
```sql
CREATE TABLE t1 USING parquet AS SELECT id % 10 AS key FROM range(100);
ANALYZE TABLE t1 COMPUTE STATISTICS FOR ALL COLUMNS;
set spark.sql.cbo.enabled=true;
EXPLAIN COST SELECT key FROM (SELECT key FROM t1 DISTRIBUTE BY key) t GROUP BY key;
```
Before this PR:
```
== Optimized Logical Plan ==
Aggregate [key#2950L], [key#2950L], Statistics(sizeInBytes=1600.0 B)
+- RepartitionByExpression [key#2950L], Statistics(sizeInBytes=1600.0 B, rowCount=100)
   +- Relation default.t1[key#2950L] parquet, Statistics(sizeInBytes=1600.0 B, rowCount=100)
```
After this PR:
```
== Optimized Logical Plan ==
Aggregate [key#2950L], [key#2950L], Statistics(sizeInBytes=160.0 B, rowCount=10)
+- RepartitionByExpression [key#2950L], Statistics(sizeInBytes=1600.0 B, rowCount=100)
   +- Relation default.t1[key#2950L] parquet, Statistics(sizeInBytes=1600.0 B, rowCount=100)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #32309 from wangyum/SPARK-35203.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-06-16 10:20:13 +09:00
Wenchen Fan 11e96dc843 [SPARK-35669][SQL] Quote the pushed column name only when nested column predicate pushdown is enabled
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/31964

We should only quote the column name when nested column predicate pushdown is enabled, otherwise the data source side may not have the logic to parse the quoted column name and fail. This is not a problem before #31964 , as we don't quote the column name if there is no dot in the name. But #31964 changed it.

### Why are the changes needed?

fix a query failure

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new test

Closes #32807 from cloud-fan/bug.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-06-16 09:43:28 +09:00
Cheng Su 9709ee5ffd [SPARK-35760][SQL] Fix the max rows check for broadcast exchange
### What changes were proposed in this pull request?

This is to fix the maximal allowed number of rows check in `BroadcastExchangeExec`. After https://github.com/apache/spark/pull/27828, the max number of rows is calculated based on max capacity of `BytesToBytesMap` (previous value before the PR is 512000000). This calculation is not accurate as only `UnsafeHashedRelation` is using `BytesToBytesMap`. `LongHashedRelation` (used for broadcast join on key with long data type) has limit of [512000000](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L584), and `BroadcastNestedLoopJoinExec` is not depending on `HashedRelation` at all.

The change is to only specialize the max rows limit when needed. Keep other broadcast case with the previous limit - 512000000.

### Why are the changes needed?

Fix code logic and avoid unexpected behavior.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests.

Closes #32911 from c21/broadcast.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-06-16 09:36:24 +09:00
Sumeet Gajjar 864ff67746 [SPARK-35429][CORE] Remove commons-httpclient from Hadoop-3.2 profile due to EOL and CVEs
### What changes were proposed in this pull request?

Remove commons-httpclient as a direct dependency for Hadoop-3.2 profile.
Hadoop-2.7 profile distribution still has it, hadoop-client has a compile dependency on commons-httpclient, thus we cannot remove it for Hadoop-2.7 profile.
```
[INFO] +- org.apache.hadoop:hadoop-client:jar:2.7.4:compile
[INFO] |  +- org.apache.hadoop:hadoop-common:jar:2.7.4:compile
[INFO] |  |  +- commons-cli:commons-cli:jar:1.2:compile
[INFO] |  |  +- xmlenc:xmlenc:jar:0.52:compile
[INFO] |  |  +- commons-httpclient:commons-httpclient:jar:3.1:compile
```

### Why are the changes needed?

Spark is pulling in commons-httpclient as a dependency directly. commons-httpclient went EOL years ago and there are most likely CVEs not being reported against it, thus we should remove it.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

- Existing unittests
- Checked the dependency tree before and after introducing the changes

Before:
```
./build/mvn dependency:tree -Phadoop-3.2 | grep -i "commons-httpclient"
Using `mvn` from path: /usr/bin/mvn
[INFO] +- commons-httpclient:commons-httpclient:jar:3.1:compile
[INFO] |  +- commons-httpclient:commons-httpclient:jar:3.1:provided
```

After
```
./build/mvn dependency:tree | grep -i "commons-httpclient"
Using `mvn` from path: /Users/sumeet.gajjar/cloudera/upstream-spark/build/apache-maven-3.6.3/bin/mvn
```

P.S. Reopening this since [spark upgraded](463daabd5a) its `hive.version` to `2.3.9` which does not have a dependency on `commons-httpclient`.

Closes #32912 from sumeetgajjar/SPARK-35429.

Authored-by: Sumeet Gajjar <sumeetgajjar93@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-15 14:43:30 -07:00
Max Gekk 61ce8f7649 [SPARK-35680][SQL] Add fields to YearMonthIntervalType
### What changes were proposed in this pull request?
Extend `YearMonthIntervalType` to support interval fields. Valid interval field values:
- 0 (YEAR)
- 1 (MONTH)

After the changes, the following year-month interval types are supported:
1. `YearMonthIntervalType(0, 0)` or `YearMonthIntervalType(YEAR, YEAR)`
2. `YearMonthIntervalType(0, 1)` or `YearMonthIntervalType(YEAR, MONTH)`. **It is the default one**.
3. `YearMonthIntervalType(1, 1)` or `YearMonthIntervalType(MONTH, MONTH)`

Closes #32825

### Why are the changes needed?
In the current implementation, Spark supports only `interval year to month` but the SQL standard allows to specify the start and end fields. The changes will allow to follow ANSI SQL standard more precisely.

### Does this PR introduce _any_ user-facing change?
Yes but `YearMonthIntervalType` has not been released yet.

### How was this patch tested?
By existing test suites.

Closes #32909 from MaxGekk/add-fields-to-YearMonthIntervalType.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-15 23:08:12 +03:00
Andy Grove 1012967ace [SPARK-35767][SQL] Avoid executing child plan twice in CoalesceExec
### What changes were proposed in this pull request?

`CoalesceExec` needlessly calls `child.execute` twice when it could just call it once and re-use the results. This only happens when `numPartitions == 1`.

### Why are the changes needed?

It is more efficient to execute the child plan once rather than twice.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

There are no functional changes. This is just a performance optimization, so the existing tests should be sufficient to catch any regressions.

Closes #32920 from andygrove/coalesce-exec-executes-twice.

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-06-15 11:59:21 -07:00
Angerszhuuuu 8a02f3a413 [SPARK-35129][SQL] Construct year-month interval column from integral fields
### What changes were proposed in this pull request?
Add a  new function to support construct YearMonthIntervalType from integral fields

### Why are the changes needed?
Add a  new function to support construct YearMonthIntervalType from integral fields

### Does this PR introduce _any_ user-facing change?
Yea user can use `make_ym_interval` to construct TearMonthIntervalType from years/months integral fields

### How was this patch tested?
Added UT

Closes #32645 from AngersZhuuuu/SPARK-35129.

Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-15 19:19:41 +03:00
Gengliang Wang c382d4009b [SPARK-35766][SQL][TESTS] Break down CastSuite/AnsiCastSuite into multiple files
### What changes were proposed in this pull request?

Currently, the file CastSuite.scala becomes big: 2000 lines, 2 base classes, 4 test suites.
In my previous work of Timestamp without time zone, I planned to put new test cases in CastSuiteBase, but they were accidentally added in AnsiCastSuiteBase.

This PR is to break the file down into 3 files. It also moves the test cases about timestamp without time zone to the right base class.

### Why are the changes needed?

Make development and review easier.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests

Closes #32918 from gengliangwang/refactorCastSuite.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-16 00:17:04 +08:00
Tanel Kiis b74260f67f [SPARK-35765][SQL] Distinct aggs are not duplicate sensitive
### What changes were proposed in this pull request?

Extended `RemoveRedundantAggregates` to remove deduplicating aggregations before aggregations that ignore duplicates.

### Why are the changes needed?

Performance imporovement.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Extending existing UT

Closes #32904 from tanelk/SPARK-33122_followup2_distinct_agg.

Authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-15 22:25:04 +09:00
gengjiaan b191d720e1 [SPARK-35056][SQL] Group exception messages in execution/streaming
### What changes were proposed in this pull request?
This PR group exception messages in `sql/core/src/main/scala/org/apache/spark/sql/execution/streaming`.

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #32880 from beliefer/SPARK-35056.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-15 12:19:52 +03:00
Gengliang Wang 195090afcc [SPARK-35764][SQL] Assign pretty names to TimestampWithoutTZType
### What changes were proposed in this pull request?

In the PR, I propose to override the typeName() method in TimestampWithoutTZType, and assign it a name according to the ANSI SQL standard
![image](https://user-images.githubusercontent.com/1097932/122013859-2cf50680-cdf1-11eb-9fcd-0ec1b59fb5c0.png)

### Why are the changes needed?

To improve Spark SQL user experience, and have readable types in error messages.

### Does this PR introduce _any_ user-facing change?

No, the new timestamp type is not released yet.
### How was this patch tested?

Unit test

Closes #32915 from gengliangwang/typename.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-15 12:15:13 +03:00
Wenchen Fan a50bd8f810 [SPARK-35742][SQL] Expression.semanticEquals should be symmetrical
### What changes were proposed in this pull request?

Currently, there are some expressions that overwrite `semanticEquals`, which makes it not symmetrical. Ideally, expressions should overwrite `canonicalized` instead of `semanticEquals`.

This PR marks `semanticEquals` as final, and implement `canonicalized` for the few expressions that overwrote `semanticEquals` before.

### Why are the changes needed?

To avoid subtle bugs (I haven't found a real bug yet).

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

a new test

Closes #32885 from cloud-fan/attr.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-15 08:53:04 +00:00
Kousuke Saruta aab0c2bf66 [SPARK-35736][SPARK-35737][SQL][FOLLOWUP] Move a common logic to DayTimeIntervalType
### What changes were proposed in this pull request?

This is a followup PR for SPARK-35736(#32893) and SPARK-35737(#32892).
This PR moves a common logic to `object DayTimeIntervalType`.
That logic is like `val strToFieldIndex = DayTimeIntervalType.dayTimeFields.map(i => DayTimeIntervalType.fieldToString(i) -> (i).toMap`, a `Map` which maps each time unit to the corresponding day-time field index.

### Why are the changes needed?

That logic appeared in the change in SPARK-35736 and SPARK-35737 so it can be a common logic and it's better to avoid the similar logic scattered.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #32905 from sarutak/followup-SPARK-35736-35737.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-14 20:51:18 +03:00
Kousuke Saruta 82af318c31 [SPARK-35748][SS][SQL] Fix StreamingJoinHelper to be able to handle day-time interval
### What changes were proposed in this pull request?

This PR fixes `StreamingJoinHelper` to be able to handle day-time interval.

### Why are the changes needed?

In the current master, `StreamingJoinHelper.getStateValueWatermark` can't handle conditions which contain day-time interval literals.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New assertions added to `StreamingJoinHlelperSuite`.

Closes #32896 from sarutak/streamingjoinhelper-daytime.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-14 15:45:36 +03:00
Kousuke Saruta 439e94c171 [SPARK-35737][SQL] Parse day-time interval literals to tightest types
### What changes were proposed in this pull request?

This PR add a feature which parse day-time interval literals to tightest type.

### Why are the changes needed?

To comply with the ANSI behavior.
For example, `INTERVAL '10 20:30' DAY TO MINUTE` should be parsed as `DayTimeIntervalType(DAY, MINUTE)` but not as `DayTimeIntervalType(DAY, SECOND)`.

### Does this PR introduce _any_ user-facing change?

No because `DayTimeIntervalType` will be introduced in `3.2.0`.

### How was this patch tested?

New tests.

Closes #32892 from sarutak/tight-daytime-interval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-14 10:06:19 +03:00
Kousuke Saruta 7978fdc97b [SPARK-35736][SQL] Parse any day-time interval types in SQL
### What changes were proposed in this pull request?
This PR adda a feature which allow the parser parse any day-time interval types in SQL.

### Why are the changes needed?
To comply with ANSI standard, we additionally need to support the following types.

* INTERVAL DAY
* INTERVAL DAY TO HOUR
* INTERVAL DAY TO MINUTE
* INTERVAL HOUR
* INTERVAL HOUR TO MINUTE
* INTERVAL HOUR TO SECOND
* INTERVAL MINUTE
* INTERVAL MINUTE TO SECOND
* INTERVAL SECOND

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
New tests.

Closes #32893 from sarutak/parse-any-day-time.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-14 00:13:50 +03:00
Gengliang Wang 6272222bc0 [SPARK-35719][SQL] Support type conversion between timestamp and timestamp without time zone type
### What changes were proposed in this pull request?

1. Extend the Cast expression and support TimestampType in casting to TimestampWithoutTZType.
2. There was a mistake in casting TimestampWithoutTZType as TimestampType in https://github.com/apache/spark/pull/32864. The target value should be `sourceValue - timeZoneOffset` instead of being the same value.

### Why are the changes needed?

To conform the ANSI SQL standard which requires to support such casting.

### Does this PR introduce _any_ user-facing change?

No, the new timestamp type is not released yet.

### How was this patch tested?

Unit test

Closes #32878 from gengliangwang/timestampToTimestampWithoutTZ.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-13 18:44:24 +03:00
Haiyang Sun 0ba1d3852b [SPARK-35701][SQL] Use copy-on-write semantics for SQLConf registered configurations
### What changes were proposed in this pull request?

Using copy-on-write for `SQLConf.sqlConfEntries` and `SQLConf.staticConfKeys` to reduce contention in concurrent workloads.

### Why are the changes needed?

The global locks used to protect `SQLConf.sqlConfEntries` map and the `SQLConf.staticConfKeys` set can cause significant contention on the `SQLConf` instance in a concurrent setting.

Using copy-on-write versions should reduce the contention given that modifications to the configs are relatively rare.

Closes #32865 from haiyangsun-db/SPARK-35701.

Authored-by: Haiyang Sun <haiyang.sun@databricks.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-06-12 14:59:48 -07:00
Kousuke Saruta 80f7989d9a [SPARK-35734][SQL] Format day-time intervals using type fields
### What changes were proposed in this pull request?

This PR add a feature which formats day-time interval to strings using the start and end fields of `DayTimeIntervalType`.

### Why are the changes needed?

Currently, they are ignored, and any `DayTimeIntervalType` is formatted as `INTERVAL DAY TO SECOND.`

### Does this PR introduce _any_ user-facing change?

Yes. The format of day-time intervals is determined the start and end fields.

### How was this patch tested?

New test.

Closes #32891 from sarutak/interval-format.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-12 21:45:12 +03:00
Chao Sun 9c7250fa73 [SPARK-35321][SQL] Don't register Hive permanent functions when creating Hive client
### What changes were proposed in this pull request?

Instantiate a new Hive client through `Hive.getWithoutRegisterFns(conf, false)` instead of `Hive.get(conf)`, if `Hive` version is >= '2.3.9' (the built-in version).

### Why are the changes needed?

[HIVE-10319](https://issues.apache.org/jira/browse/HIVE-10319) introduced a new API `get_all_functions` which is only supported in Hive 1.3.0/2.0.0 and up. As result, when Spark 3.x talks to a HMS service of version 1.2 or lower, the following error will occur:
```
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.thrift.TApplicationException: Invalid method name: 'get_all_functions'
        at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3897)
        at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:248)
        at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:231)
        ... 96 more
Caused by: org.apache.thrift.TApplicationException: Invalid method name: 'get_all_functions'
        at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_all_functions(ThriftHiveMetastore.java:3845)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_all_functions(ThriftHiveMetastore.java:3833)
```

The `get_all_functions` is called only when `doRegisterAllFns` is set to true:
```java
  private Hive(HiveConf c, boolean doRegisterAllFns) throws HiveException {
    conf = c;
    if (doRegisterAllFns) {
      registerAllFunctionsOnce();
    }
  }
```

what this does is to register all Hive permanent functions defined in HMS in Hive's `FunctionRegistry` class, via iterating through results from `get_all_functions`. To Spark, this seems unnecessary as it loads Hive permanent (not built-in) UDF via directly calling the HMS API, i.e., `get_function`. The `FunctionRegistry` is only used in loading Hive's built-in function that is not supported by Spark. At this time, it only applies to `histogram_numeric`.

[HIVE-21563](https://issues.apache.org/jira/browse/HIVE-21563) introduced a new API `getWithoutRegisterFns` which skips the above registration and is available in Hive 2.3.9. Therefore, Spark should adopt it to avoid the cost.

### Does this PR introduce _any_ user-facing change?

Yes with this fix Spark now should be able to talk to HMS server with Hive 1.2.x and lower.

### How was this patch tested?

Manually started a HMS server of Hive version 1.2.2. Without the PR it failed with the above exception. With the PR the error disappeared and I can successfully perform common operations such as create table, create database, list tables, etc.

Closes #32887 from sunchao/SPARK-35321-new.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-06-12 10:32:30 +08:00
Liang-Chi Hsieh 703376e8a9 [SPARK-35689][SS] Add log warn when keyWithIndexToValue returns null value
### What changes were proposed in this pull request?

This patch adds log warn when `keyWithIndexToValue` returns null value in `SymmetricHashJoinStateManager`.

### Why are the changes needed?

Once we get null from state store in SymmetricHashJoinStateManager, it is better to add meaningful logging for the case. It is better for debugging.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #32828 from viirya/fix-ss-joinstatemanager-followup.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-06-12 10:17:09 +09:00
Chendi Xue e958833c72 [SPARK-35396][SQL][TESTS][FOLLOWUP] Add a UT to check if a user-defined cachedBatch is completely released
### What changes were proposed in this pull request?
This PR is used to do add a UT to check if user-defined cached batch are completely released when clearCache called.

### Why are the changes needed?
Add a new UT file RefCountedTestCachedBatchSerializerSuite.scala

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT is added, org.apache.spark.sql.execution.columnar.RefCountedTestCachedBatchSerializerSuite

Closes #32717 from xuechendi/support_manual_close_in_InMemoryRelation.

Authored-by: Chendi Xue <chendi.xue@intel.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-11 08:20:26 -07:00
Max Gekk d53831ff5c [SPARK-35704][SQL] Add fields to DayTimeIntervalType
### What changes were proposed in this pull request?
Extend DayTimeIntervalType to support interval fields. Valid interval field values:
- 0 (DAY)
- 1 (HOUR)
- 2 (MINUTE)
- 3 (SECOND)

After the changes, the following day-time interval types are supported:
1. `DayTimeIntervalType(0, 0)` or `DayTimeIntervalType(DAY, DAY)`
2. `DayTimeIntervalType(0, 1)` or `DayTimeIntervalType(DAY, HOUR)`
3. `DayTimeIntervalType(0, 2)` or `DayTimeIntervalType(DAY, MINUTE)`
4. `DayTimeIntervalType(0, 3)` or `DayTimeIntervalType(DAY, SECOND)`. **It is the default one**. The second fraction precision is microseconds.
5. `DayTimeIntervalType(1, 1)` or `DayTimeIntervalType(HOUR, HOUR)`
6. `DayTimeIntervalType(1, 2)` or `DayTimeIntervalType(HOUR, MINUTE)`
7. `DayTimeIntervalType(1, 3)` or `DayTimeIntervalType(HOUR, SECOND)`
8. `DayTimeIntervalType(2, 2)` or `DayTimeIntervalType(MINUTE, MINUTE)`
9. `DayTimeIntervalType(2, 3)` or `DayTimeIntervalType(MINUTE, SECOND)`
10. `DayTimeIntervalType(3, 3)` or `DayTimeIntervalType(SECOND, SECOND)`

### Why are the changes needed?
In the current implementation, Spark supports only `interval day to second` but the SQL standard allows to specify the start and end fields. The changes will allow to follow ANSI SQL standard more precisely.

### Does this PR introduce _any_ user-facing change?
Yes but `DayTimeIntervalType` has not been released yet.

### How was this patch tested?
By existing test suites.

Closes #32849 from MaxGekk/day-time-interval-type-units.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-11 16:16:33 +03:00
Tanel Kiis 692dc66c4a [SPARK-35695][SQL] Collect observed metrics from cached and adaptive execution sub-trees
### What changes were proposed in this pull request?

Collect observed metrics from cached and adaptive execution sub-trees.

### Why are the changes needed?

Currently persisting/caching will hide all observed metrics in that sub-tree from reaching the `QueryExecutionListeners`. Adaptive query execution can also hide the metrics from reaching `QueryExecutionListeners`.

### Does this PR introduce _any_ user-facing change?

Bugfix

### How was this patch tested?

New UTs

Closes #32862 from tanelk/SPARK-35695_collect_metrics_persist.

Lead-authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Co-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-11 21:03:08 +08:00
RoryQi 57ce64c511 [SPARK-35706][SQL] Consider making the ':' in STRUCT data type definition optional
### What changes were proposed in this pull request?

The STRUCT type syntax is defined like this:

STRUCT(fieldNmae: fileType [NOT NULL][COMMENT stringLiteral][,.....])

So the field list is nearly the same as a column list

if we could make ':' optional it would be so much cleaner an less proprietary

### Why are the changes needed?
ease of use

### Does this PR introduce _any_ user-facing change?
Yes, you can use Struct type list is nearly the same as a column list

### How was this patch tested?
unit tests

Closes #32858 from jerqi/master.

Authored-by: RoryQi <1242949407@qq.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-11 12:58:32 +00:00
dgd-contributor 6e1aa15679 [SPARK-35652][SQL] joinWith on two table generated from same one
### What changes were proposed in this pull request?
It seems like spark inner join is performing a cartesian join in self joining using `joinWith`

To produce this issues:
```
val df = spark.range(0,3)
df.joinWith(df, df("id") === df("id")).show()
```

Before this pull request, the result is
+---+---+
 | _1 |  _2 |
+---+---+
|    0 |   0 |
|    0 |   1 |
|    0 |   2 |
|    1 |   0 |
|    1 |   1 |
|    1 |   2 |
|    2 |   0 |
|    2 |   1 |
|    2 |   2 |
+---+---+

The expected result is
+---+---+
 | _1 |  _2 |
+---+---+
|    0 |   0 |
|    1 |   1 |
|    2 |   2 |
+---+---+
### Why are the changes needed?
correctness

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
add test

Closes #32863 from dgd-contributor/SPARK-35652_join_and_joinWith_in_seft_joining.

Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-11 20:36:50 +08:00
Liang-Chi Hsieh c463472e85 [SPARK-35439][SQL][FOLLOWUP] ExpressionContainmentOrdering should not sort unrelated expressions
### What changes were proposed in this pull request?

This is a followup of #32586. We introduced `ExpressionContainmentOrdering` to sort common expressions according to their parent-child relations. For unrelated expressions, previously the ordering returns -1 which is not correct and can possibly lead to transitivity issue.

### Why are the changes needed?

To fix the possible transitivity issue of `ExpressionContainmentOrdering`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test.

Closes #32870 from viirya/SPARK-35439-followup.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-06-11 16:13:46 +09:00
Gengliang Wang e9af4576d5 [SPARK-35718][SQL] Support casting of Date to timestamp without time zone type
### What changes were proposed in this pull request?

Extend the Cast expression and support DateType in casting to TimestampWithoutTZType.

### Why are the changes needed?

To conform the ANSI SQL standard which requires to support such casting.

### Does this PR introduce _any_ user-facing change?

No, the new timestamp type is not released yet.

### How was this patch tested?

Unit test

Closes #32873 from gengliangwang/dateToTswtz.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-11 05:41:28 +00:00
Chao Sun e9ccf4a50c [SPARK-35640][SQL] Refactor Parquet vectorized reader to remove duplicated code paths
### What changes were proposed in this pull request?

1. Remove duplicated code in the form of `readXXX` in `VectorizedRleValuesReader`. For instance:
```java
  public void readIntegers(
      int total,
      WritableColumnVector c,
      int rowId,
      int level,
      VectorizedValuesReader data) throws IOException {
    int left = total;
    while (left > 0) {
      if (this.currentCount == 0) this.readNextGroup();
      int n = Math.min(left, this.currentCount);
      switch (mode) {
        case RLE:
          if (currentValue == level) {
            data.readIntegers(n, c, rowId);
          } else {
            c.putNulls(rowId, n);
          }
          break;
        case PACKED:
          for (int i = 0; i < n; ++i) {
            if (currentBuffer[currentBufferIdx++] == level) {
              c.putInt(rowId + i, data.readInteger());
            } else {
              c.putNull(rowId + i);
            }
          }
          break;
      }
      rowId += n;
      left -= n;
      currentCount -= n;
    }
  }
```
and replace with:
```java
  public void readBatch(
       int total,
       int offset,
       WritableColumnVector values,
       int maxDefinitionLevel,
       VectorizedValuesReader valueReader,
       ParquetVectorUpdater updater) throws IOException {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
       int n = Math.min(left, this.currentCount);
       switch (mode) {
         case RLE:
           if (currentValue == maxDefinitionLevel) {
             updater.updateBatch(n, offset, values, valueReader);
           } else {
             values.putNulls(offset, n);
           }
           break;
         case PACKED:
           for (int i = 0; i < n; ++i) {
             if (currentBuffer[currentBufferIdx++] == maxDefinitionLevel) {
               updater.update(offset + i, values, valueReader);
             } else {
               values.putNull(offset + i);
             }
           }
           break;
       }
       offset += n;
       left -= n;
       currentCount -= n;
     }
   }
```
where the `ParquetVectorUpdater` is type specific, and has different implementations under `updateBatch` and `update`. Together, this also changes code paths handling timestamp types to use the batch read API for decoding definition levels.

2. Similar to the above, this removes code duplication in `VectorizedColumnReader.decodeDictionaryIds`. Now different implementations are under `ParquetVectorUpdater.decodeSingleDictionaryId`.

### Why are the changes needed?

`VectorizedRleValuesReader` and `VectorizedColumnReader` are becoming increasingly harder to maintain, as any change touches the above logic **will need to be replicated in 20+ places**. The issue becomes even more serious when we are going to implement column index (for instance, see how the change [here](https://github.com/apache/spark/pull/32753/files#diff-a01e174e178366aadf07f64ee690d47d343b2ca416a4a2b2ea735887c22d5934R191) has to be replicated multiple times) and complex type support (in progress) for the vectorized path.

In addition, currently dictionary decoding (see `VectorizedColumnReader.decodeDictionaryIds`) and non-dictionary decoding are handled separately, and therefore the same (very complicated) branching logic based on input Spark & Parquet types have to be replicated in two places, which is another burden for code maintenance.

The original intention is for performance. However these days JIT compilers tend to be very effective on this and will inline virtual calls aggressively to eliminate the method invocation costs (see [this](https://shipilev.net/blog/2015/black-magic-method-dispatch/) and [this](http://insightfullogic.com/blog/2014/may/12/fast-and-megamorphic-what-influences-method-invoca/)). I've also done benchmarks using a modified `DataSourceReadBenchmark` and `DateTimeRebaseBenchmark` and the result is almost exact the same before and after the change. The results can be found [here](https://gist.github.com/sunchao/674afbf942ccc2370bdcfa33efb4471c), and [here's](https://github.com/sunchao/spark/tree/parquet-refactor) the source code.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #32777 from sunchao/SPARK-35640.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2021-06-11 05:39:43 +00:00
Yuming Wang 463daabd5a [SPARK-34512][BUILD][SQL] Upgrade built-in Hive to 2.3.9
### What changes were proposed in this pull request?

This pr upgrades built-in Hive to 2.3.9. Hive 2.3.9 changes:
- [HIVE-17155] - findConfFile() in HiveConf.java has some issues with the conf path
- [HIVE-24797] - Disable validate default values when parsing Avro schemas
- [HIVE-24608] - Switch back to get_table in HMS client for Hive 2.3.x
- [HIVE-21200] - Vectorization: date column throwing java.lang.UnsupportedOperationException for parquet
- [HIVE-21563] - Improve Table#getEmptyTable performance by disabling registerAllFunctionsOnce
- [HIVE-19228] - Remove commons-httpclient 3.x usage

### Why are the changes needed?

Fix regression caused by AVRO-2035.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #32750 from wangyum/SPARK-34512.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-10 20:44:35 -07:00
Gengliang Wang d21ff1318f [SPARK-35716][SQL] Support casting of timestamp without time zone to date type
### What changes were proposed in this pull request?

Extend the Cast expression and support TimestampWithoutTZType in casting to DateType.

### Why are the changes needed?

To conform the ANSI SQL standard which requires to support such casting.

### Does this PR introduce _any_ user-facing change?

No, the new timestamp type is not released yet.

### How was this patch tested?

Unit test

Closes #32869 from gengliangwang/castToDate.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-10 23:37:02 +03:00
Kousuke Saruta 44b695fbb0 [SPARK-35296][SQL] Allow Dataset.observe to work even if CollectMetricsExec in a task handles multiple partitions
### What changes were proposed in this pull request?

This PR fixes an issue that `Dataset.observe` doesn't work if `CollectMetricsExec` in a task handles multiple partitions.
If `coalesce` follows `observe` and the number of partitions shrinks after `coalesce`, `CollectMetricsExec` can handle multiple partitions in a task.

### Why are the changes needed?

The current implementation of `CollectMetricsExec` doesn't consider the case it can handle multiple partitions.
Because new `updater` is created for each partition even though those partitions belong to the same task, `collector.setState(updater)` raise an assertion error.
This is a simple reproducible example.
```
$ bin/spark-shell --master "local[1]"
scala> spark.range(1, 4, 1, 3).observe("my_event", count($"id").as("count_val")).coalesce(2).collect
```
```
java.lang.AssertionError: assertion failed
	at scala.Predef$.assert(Predef.scala:208)
	at org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
	at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
	at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
	at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125)
	at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
	at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
	at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
	at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #32786 from sarutak/fix-collectmetricsexec.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-11 01:20:35 +08:00
Emil Ejbyfeldt e2e3fe7782 [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map with case classes as keys or values
### What changes were proposed in this pull request?
Use the key/value LambdaFunction to convert the elements instead of
using CatalystTypeConverters.createToScalaConverter. This is how it is
done in MapObjects and that correctly handles Arrays with case classes.

### Why are the changes needed?
Before these changes the added test cases would fail with the following:
```
[info] - encode/decode for map with case class as value: Map(1 -> IntAndString(1,a)) (interpreted path) *** FAILED *** (64 milliseconds)
[info]   Encoded/Decoded data does not match input data
[info]
[info]   in:  Map(1 -> IntAndString(1,a))
[info]   out: Map(1 -> [1,a])
[info]   types: scala.collection.immutable.Map$Map1 [info]
[info]   Encoded Data: [org.apache.spark.sql.catalyst.expressions.UnsafeMapData5ecf5d9e]
[info]   Schema: value#823
[info]   root
[info]   -- value: map (nullable = true)
[info]       |-- key: integer
[info]       |-- value: struct (valueContainsNull = true)
[info]       |    |-- i: integer (nullable = false)
[info]       |    |-- s: string (nullable = true)
[info]
[info]
[info]   fromRow Expressions:
[info]   catalysttoexternalmap(lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178), lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178), lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179), if (isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) null else newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString), input[0, map<int,struct<i:int,s:string>>, true], interface scala.collection.immutable.Map
[info]   :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178)
[info]   :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178)
[info]   :- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   :- if (isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) null else newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString)
[info]   :  :- isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))
[info]   :  :  +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   :  :- null
[info]   :  +- newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString)
[info]   :     :- assertnotnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i)
[info]   :     :  +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i
[info]   :     :     +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   :     +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s.toString
[info]   :        +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s
[info]   :           +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   +- input[0, map<int,struct<i:int,s:string>>, true] (ExpressionEncoderSuite.scala:627)
```
So using a map with cases classes for keys or values and using the interpreted path would incorrect deserialize data from the catalyst representation.

### Does this PR introduce _any_ user-facing change?
Yes, it fixes the bug.

### How was this patch tested?
Existing and new unit tests in the ExpressionEncoderSuite

Closes #32783 from eejbyfeldt/fix-interpreted-path-for-map-with-case-classes.

Authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-06-10 09:37:27 -07:00
Gengliang Wang 4180692135 [SPARK-35711][SQL] Support casting of timestamp without time zone to timestamp type
### What changes were proposed in this pull request?

Extend the Cast expression and support TimestampWithoutTZType in casting to TimestampType.

### Why are the changes needed?

To conform the ANSI SQL standard which requires to support such casting.

### Does this PR introduce _any_ user-facing change?

No, the new timestamp type is not released yet.

### How was this patch tested?

Unit test

Closes #32864 from gengliangwang/castToTimestamp.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-10 23:03:52 +08:00
Terry Kim 88f1d82a46 [SPARK-34524][SQL][FOLLOWUP] Remove unused checkAlterTablePartition in CheckAnalysis.scala
### What changes were proposed in this pull request?

#31637 removed the usage of `CheckAnalysis.checkAlterTablePartition` but didn't remove the function.

### Why are the changes needed?

To removed an unused function.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #32855 from imback82/SPARK-34524-followup.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-10 12:43:09 +00:00
Fu Chen 5280f02747 [SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in subquery
### What changes were proposed in this pull request?

Use `UnresolvedHint.resolved = child.resolved` instead `UnresolvedHint.resolved = false`, then the plan contains `UnresolvedHint` child can be optimized by rule in batch `Resolution`.

For instance, before this pr, the following plan can't be optimized by `ResolveReferences`.
```
!'Project [*]
 +- SubqueryAlias __auto_generated_subquery_name
    +- UnresolvedHint use_hash
       +- Project [42 AS 42#10]
          +- OneRowRelation
```

### Why are the changes needed?

fix hint in subquery bug

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #32841 from cfmcgrady/SPARK-35673.

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-10 15:32:10 +08:00
dgd-contributor aa3de40773 [SPARK-35679][SQL] instantToMicros overflow
### Why are the changes needed?
With Long.minValue cast to an instant, secs will be floored in function microsToInstant and cause overflow when multiply with Micros_per_second

```
def microsToInstant(micros: Long): Instant = {
  val secs = Math.floorDiv(micros, MICROS_PER_SECOND)
  // Unfolded Math.floorMod(us, MICROS_PER_SECOND) to reuse the result of
  // the above calculation of `secs` via `floorDiv`.
  val mos = micros - secs * MICROS_PER_SECOND  <- it will overflow here
  Instant.ofEpochSecond(secs, mos * NANOS_PER_MICROS)
}
```

But the overflow is acceptable because it won't produce any change to the result

However, when convert the instant back to micro value, it will raise Overflow Error

```
def instantToMicros(instant: Instant): Long = {
  val us = Math.multiplyExact(instant.getEpochSecond, MICROS_PER_SECOND) <- It overflow here
  val result = Math.addExact(us, NANOSECONDS.toMicros(instant.getNano))
  result
}
```

Code to reproduce this error
```
instantToMicros(microToInstant(Long.MinValue))
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Test added

Closes #32839 from dgd-contributor/SPARK-35679_instantToMicro.

Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-06-10 08:08:51 +03:00
ulysses-you 8dde20a993 [SPARK-35675][SQL] EnsureRequirements remove shuffle should respect PartitioningCollection
### What changes were proposed in this pull request?

Add `PartitioningCollection` in EnsureRequirements during remove shuffle.

### Why are the changes needed?

Currently `EnsureRequirements` only check if child has semantic equal `HashPartitioning` and remove
redundant shuffle. We can enhance this case using `PartitioningCollection`.

### Does this PR introduce _any_ user-facing change?

Yes, plan might be changed.

### How was this patch tested?

Add test.

Closes #32815 from ulysses-you/shuffle-node.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
2021-06-10 13:03:47 +08:00
Linhong Liu 87d2ffbbcf [MINOR][SQL] No need to normolize name for built-in functions
### What changes were proposed in this pull request?
Add an `internalRegisterFunction` for the built-in function registry. So that
we can skip the unnecessary function normalization.

### Why are the changes needed?
small refactor

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing ut

Closes #32842 from linhongliu-db/function-refactor.

Lead-authored-by: Linhong Liu <linhong.liu@databricks.com>
Co-authored-by: Linhong Liu <67896261+linhongliu-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-10 04:35:26 +00:00
Kousuke Saruta 7e99b65295 [SPARK-35194][SQL][FOLLOWUP] Change Seq to collections.Seq in NestedColumnAliasing to work with Scala 2.13
### What changes were proposed in this pull request?

This PR changes an occurrence of `Seq` to `collections.Seq` in `NestedColumnAliasing`.

### Why are the changes needed?

In the current master, `NestedColumnAliasing` doesn't work with Scala 2.13 and the relevant tests fail.
The following are examples.

* `NestedColumnAliasingSuite`
* Subclasses of `SchemaPruningSuite`
* `ColumnPruningSuite`

```
NestedColumnAliasingSuite:
[info] - Pushing a single nested field projection *** FAILED *** (14 milliseconds)
[info]   scala.MatchError: (none#211451,ArrayBuffer(name#211451.middle)) (of class scala.Tuple2)
[info]   at org.apache.spark.sql.catalyst.optimizer.NestedColumnAliasing$.$anonfun$getAttributeToExtractValues$5(NestedColumnAliasing.scala:258)
[info]   at scala.collection.StrictOptimizedMapOps.flatMap(StrictOptimizedMapOps.scala:31)
[info]   at scala.collection.StrictOptimizedMapOps.flatMap$(StrictOptimizedMapOps.scala:30)
[info]   at scala.collection.immutable.HashMap.flatMap(HashMap.scala:39)
[info]   at org.apache.spark.sql.catalyst.optimizer.NestedColumnAliasing$.getAttributeToExtractValues(NestedColumnAliasing.scala:258)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Ran tests mentioned above and all passed with Scala 2.13.

Closes #32848 from sarutak/followup-SPARK-35194-2.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-10 02:14:40 +00:00
Kousuke Saruta 94b66f5e28 [MINOR][SQL] Modify the example of rand and randn
### What changes were proposed in this pull request?

This PR fixes the examples of `rand` and `randn`.

### Why are the changes needed?

SPARK-23643 (#20793) fixes an issue which is related to the seed and it causes the result of `rand` and `randn`.
Now the results of `SELECT rand(0)` and `SELECT randn((null)` are `0.7604953758285915` and `1.6034991609278433` respectively, and they should be deterministic because the number of partitions are always 1 (the leaf node is `OneRowRelation`).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Built the doc and confirmed it.
![rand-doc](https://user-images.githubusercontent.com/4736016/121359059-145a9b80-c96e-11eb-84c2-2f2b313614f3.png)

Closes #32844 from sarutak/rand-example.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-10 10:37:38 +09:00
Gengliang Wang 74b3df86f3 [SPARK-35698][SQL] Support casting of timestamp without time zone to strings
### What changes were proposed in this pull request?

Extend the Cast expression and support TimestampWithoutTZType in casting to StringType.

### Why are the changes needed?

To conform the ANSI SQL standard which requires to support such casting.

### Does this PR introduce _any_ user-facing change?

No, the new timestamp type is not released yet.

### How was this patch tested?

Unit test

Closes #32846 from gengliangwang/tswtzToString.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-10 02:29:37 +08:00
allisonwang-db f49bf1a072 [SPARK-34382][SQL] Support LATERAL subqueries
### What changes were proposed in this pull request?
This PR adds support for lateral subqueries. A lateral subquery is a subquery preceded by the `LATERAL` keyword in the FROM clause of a query that can reference columns in the preceding FROM items. For example:
```sql
SELECT * FROM t1, LATERAL (SELECT * FROM t2 WHERE t1.a = t2.c)
```
A new subquery expression`LateralSubquery` is used to represent a lateral subquery. It is similar to `ScalarSubquery` but can return multiple rows and columns. A new logical unary node `LateralJoin` is used to represent a lateral join.

Here is the analyzed plan for the above query:
```scala
Project [a, b, c, d]
+- LateralJoin lateral-subquery [a], Inner
   :  +- Project [c, d]
   :     +- Filter (outer(a) = c)
   :        +- Relation [c, d]
   +- Relation [a, b]
```

Similar to a correlated subquery, a lateral subquery can be viewed as a dependent (nested loop) join where the evaluation of the right subtree depends on the current value of the left subtree.  The same technique to decorrelate a subquery is used to decorrelate a lateral join:
```scala
Project [a, b, c, d]
+- LateralJoin lateral-subquery [a && a = c], Inner  // pull up correlated predicates as join conditions
   :  +- Project [c, d]
   :     +- Relation [c, d]
   +- Relation [a, b]
```
Then the lateral join can be rewritten into a normal join:
```scala
Join Inner (a = c)
:- Relation [a, b]
+- Relation [c, d]
```

#### Follow-ups:
1. Similar to rewriting correlated scalar subqueries, rewriting lateral joins is also subject to the COUNT bug (See SPARK-15370 for more details). This is **not** handled in the current PR as it requires a sizeable amount of refactoring. It will be addressed in a subsequent PR (SPARK-35551).
2. Currently Spark does use outer query references to resolve star expressions in subqueries. This is not lateral subquery specific and can be handled in a separate PR (SPARK-35618)

### Why are the changes needed?
To support an ANSI SQL feature.

### Does this PR introduce _any_ user-facing change?
Yes. It allows users to use lateral subqueries in the FROM clause of a query.

### How was this patch tested?
- Parser test: `PlanParserSuite.scala`
- Analyzer test: `ResolveSubquerySuite.scala`
- Optimizer test: `PullupCorrelatedPredicatesSuite.scala`
- SQL test: `join-lateral.sql`, `postgreSQL/join.sql`

Closes #32303 from allisonwang-db/spark-34382-lateral.

Lead-authored-by: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-09 17:08:32 +00:00
Gengliang Wang 313dc2d4ed [SPARK-35697][SQL][TESTS] Test TimestampWithoutTZType as ordered and atomic type
### What changes were proposed in this pull request?
Add `TimestampWithoutTZType` to `DataTypeTestUtils.ordered`/`atomicTypes`, and implement values generation of those types in `LiteralGenerator`/`RandomDataGenerator`. In this way, the types will be tested automatically in:
1. ArithmeticExpressionSuite:
    - "function least"
    - "function greatest"
2. PredicateSuite
    - "BinaryComparison consistency check"
    - "AND, OR, EqualTo, EqualNullSafe consistency check"
3. ConditionalExpressionSuite
    - "if"
4. RandomDataGeneratorSuite
    - "Basic types"
5. CastSuite
    - "null cast"
    - "up-cast"
    - "SPARK-27671: cast from nested null type in struct"
6. OrderingSuite
    - "GenerateOrdering with TimestampWithoutTZType"
7. PredicateSuite
    - "IN with different types"
8. UnsafeRowSuite
    - "calling get(ordinal, datatype) on null columns"
9. SortSuite
    - "sorting on TimestampWithoutTZType ..."

### Why are the changes needed?
To improve test coverage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the affected test suites.

Closes #32843 from gengliangwang/atomicTest.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-09 15:19:25 +00:00