Commit graph

7213 commits

Author SHA1 Message Date
Gabor Somogyi 991f7e81d4 [SPARK-32001][SQL] Create JDBC authentication provider developer API
### What changes were proposed in this pull request?
At the moment only the baked in JDBC connection providers can be used but there is a need to support additional databases and use-cases. In this PR I'm proposing a new developer API name `JdbcConnectionProvider`. To show how an external JDBC connection provider can be implemented I've created an example [here](https://github.com/gaborgsomogyi/spark-jdbc-connection-provider).

The PR contains the following changes:
* Added connection provider developer API
* Made JDBC connection providers constructor to noarg => needed to load them w/ service loader
* Connection providers are now loaded w/ service loader
* Added tests to load providers independently
* Moved `SecurityConfigurationLock` into a central place because other areas will change global JVM security config

### Why are the changes needed?
No custom authentication possibility.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
* Existing + additional unit tests
* Docker integration tests
* Tested manually the newly created external JDBC connection provider

Closes #29024 from gaborgsomogyi/SPARK-32001.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-02 13:04:40 +09:00
Cheng Su d6f3138352 [SPARK-32859][SQL] Introduce physical rule to decide bucketing dynamically
### What changes were proposed in this pull request?

This PR is to add support to decide bucketed table scan dynamically based on actual query plan. Currently bucketing is enabled by default (`spark.sql.sources.bucketing.enabled`=true), so for all bucketed tables in the query plan, we will use bucket table scan (all input files per the bucket will be read by same task). This has the drawback that if the bucket table scan is not benefitting at all (no join/groupby/etc in the query), we don't need to use bucket table scan as it would restrict the # of tasks to be # of buckets and might hurt parallelism.

The feature is to add a physical plan rule right after `EnsureRequirements`:

The rule goes through plan nodes. For all operators which has "interesting partition" (i.e., require `ClusteredDistribution` or `HashClusteredDistribution`), check if the sub-plan for operator has `Exchange` and bucketed table scan (and only allow certain operators in plan (i.e. `Scan/Filter/Project/Sort/PartialAgg/etc`.), see details in `DisableUnnecessaryBucketedScan.disableBucketWithInterestingPartition`). If yes, disable the bucketed table scan in the sub-plan. In addition, disabling bucketed table scan if there's operator with interesting partition along the sub-plan.

Why the algorithm works is that if there's a shuffle between the bucketed table scan and operator with interesting partition, then bucketed table scan partitioning will be destroyed by the shuffle operator in the middle, and we don't need bucketed table scan for sure.

The idea of "interesting partition" is inspired from "interesting order" in "Access Path Selection in a Relational Database Management System"(http://www.inf.ed.ac.uk/teaching/courses/adbs/AccessPath.pdf), after discussion with cloud-fan .

### Why are the changes needed?

To avoid unnecessary bucketed scan in the query, and this is prerequisite for https://github.com/apache/spark/pull/29625 (decide bucketed sorted scan dynamically will be added later in that PR).

### Does this PR introduce _any_ user-facing change?

A new config `spark.sql.sources.bucketing.autoBucketedScan.enabled` is introduced which set to false by default (the rule is disabled by default as it can regress cached bucketed table query, see discussion in https://github.com/apache/spark/pull/29804#issuecomment-701151447). User can opt-in/opt-out by enabling/disabling the config, as we found in prod, some users rely on assumption of # of tasks == # of buckets when reading bucket table to precisely control # of tasks. This is a bad assumption but it does happen on our side, so leave a config here to allow them opt-out for the feature.

### How was this patch tested?

Added unit tests in `DisableUnnecessaryBucketedScanSuite.scala`

Closes #29804 from c21/bucket-rule.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-10-02 09:01:15 +09:00
ulysses e62d24717e [SPARK-32585][SQL] Support scala enumeration in ScalaReflection
### What changes were proposed in this pull request?

Add code in `ScalaReflection` to support scala enumeration and make enumeration type as string type in Spark.

### Why are the changes needed?

We support java enum but failed with scala enum, it's better to keep the same behavior.

Here is a example.

```
package test

object TestEnum extends Enumeration {
  type TestEnum = Value
  val E1, E2, E3 = Value
}
import TestEnum._
case class TestClass(i: Int,  e: TestEnum) {
}

import test._
Seq(TestClass(1, TestEnum.E1)).toDS
```

Before this PR
```
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for test.TestEnum.TestEnum
- field (class: "scala.Enumeration.Value", name: "e")
- root class: "test.TestClass"
  at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:567)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:882)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:881)
```

After this PR
`org.apache.spark.sql.Dataset[test.TestClass] = [i: int, e: string]`

### Does this PR introduce _any_ user-facing change?

Yes, user can make case class which include scala enumeration field as dataset.

### How was this patch tested?

Add test.

Closes #29403 from ulysses-you/SPARK-32585.

Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
2020-10-01 15:58:01 -04:00
Max Gekk 5651284c3b [SPARK-32992][SQL] Map Oracle's ROWID type to StringType in read via JDBC
### What changes were proposed in this pull request?
Convert the `ROWID` type in the Oracle JDBC dialect to Catalyst's `StringType`. The doc for Oracle 19c says explicitly that the type must be string: https://docs.oracle.com/en/database/oracle/oracle-database/19/sqlrf/Data-Types.html#GUID-AEF1FE4C-2DE5-4BE7-BB53-83AD8F1E34EF

### Why are the changes needed?
To avoid the exception showed in https://stackoverflow.com/questions/52244492/spark-jdbc-dataframereader-fails-to-read-oracle-table-with-datatype-as-rowid

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
N/A

Closes #29884 from MaxGekk/jdbc-oracle-rowid-string.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-10-01 14:50:32 +09:00
Takeshi Yamamuro 3a299aa648 [SPARK-32741][SQL] Check if the same ExprId refers to the unique attribute in logical plans
### What changes were proposed in this pull request?

Some plan transformations (e.g., `RemoveNoopOperators`) implicitly assume the same `ExprId` refers to the unique attribute. But, `RuleExecutor` does not check this integrity between logical plan transformations. So, this PR intends to add this check in `isPlanIntegral` of `Analyzer`/`Optimizer`.

This PR comes from the talk with cloud-fan viirya in https://github.com/apache/spark/pull/29485#discussion_r475346278

### Why are the changes needed?

For better logical plan integrity checking.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #29585 from maropu/PlanIntegrityTest.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-09-30 21:37:29 +09:00
tanel.kiis@gmail.com 90e86f6fac [SPARK-32970][SPARK-32019][SQL][TEST] Reduce the runtime of an UT for
### What changes were proposed in this pull request?

The UT for SPARK-32019 (#28853) tries to write about 16GB of data do the disk. We must change the value of `spark.sql.files.maxPartitionBytes` to a smaller value do check the correct behavior with less data. By default it is `128MB`.
The other parameters in this UT are also changed to smaller values to keep the behavior the same.

### Why are the changes needed?

The runtime of this one UT can be over 7 minutes on Jenkins. After the change it is few seconds.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing UT

Closes #29842 from tanelk/SPARK-32970.

Authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-29 16:51:44 +09:00
Max Gekk 1b60ff5afe [MINOR][DOCS] Document when current_date and current_timestamp are evaluated
### What changes were proposed in this pull request?
Explicitly document that `current_date` and `current_timestamp` are executed at the start of query evaluation. And all calls of `current_date`/`current_timestamp` within the same query return the same value

### Why are the changes needed?
Users could expect that `current_date` and `current_timestamp` return the current date/timestamp at the moment of query execution but in fact the functions are folded by the optimizer at the start of query evaluation:
0df8dd6073/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala (L71-L91)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
by running `./dev/scalastyle`.

Closes #29892 from MaxGekk/doc-current_date.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-29 05:20:12 +00:00
gengjiaan a53fc9b7ae [SPARK-27951][SQL][FOLLOWUP] Improve the window function nth_value
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/29604 supports the ANSI SQL NTH_VALUE.
We should override the `prettyName` and `sql`.

### Why are the changes needed?
Make the name of nth_value correct.
To show the ignoreNulls parameter correctly.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
Jenkins test.

Closes #29886 from beliefer/improve-nth_value.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: beliefer <beliefer@163.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-29 09:54:43 +09:00
Yuanjian Li 9e6882feca [SPARK-32885][SS] Add DataStreamReader.table API
### What changes were proposed in this pull request?
This pr aims to add a new `table` API in DataStreamReader, which is similar to the table API in DataFrameReader.

### Why are the changes needed?
Users can directly use this API to get a Streaming DataFrame on a table. Below is a simple example:

Application 1 for initializing and starting the streaming job:

```
val path = "/home/yuanjian.li/runtime/to_be_deleted"
val tblName = "my_table"

// Write some data to `my_table`
spark.range(3).write.format("parquet").option("path", path).saveAsTable(tblName)

// Read the table as a streaming source, write result to destination directory
val table = spark.readStream.table(tblName)
table.writeStream.format("parquet").option("checkpointLocation", "/home/yuanjian.li/runtime/to_be_deleted_ck").start("/home/yuanjian.li/runtime/to_be_deleted_2")
```

Application 2 for appending new data:

```
// Append new data into the path
spark.range(5).write.format("parquet").option("path", "/home/yuanjian.li/runtime/to_be_deleted").mode("append").save()
```

Check result:
```
// The desitination directory should contains all written data
spark.read.parquet("/home/yuanjian.li/runtime/to_be_deleted_2").show()
```

### Does this PR introduce _any_ user-facing change?
Yes, a new API added.

### How was this patch tested?
New UT added and integrated testing.

Closes #29756 from xuanyuanking/SPARK-32885.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-25 06:50:24 +00:00
Terry Kim e9c98c910a [SPARK-32990][SQL] Migrate REFRESH TABLE to use UnresolvedTableOrView to resolve the identifier
### What changes were proposed in this pull request?

This PR proposes to migrate `REFRESH TABLE` to use `UnresolvedTableOrView` to resolve the table/view identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing).

### Why are the changes needed?

The current behavior is not consistent between v1 and v2 commands when resolving a temp view.
In v2, the `t` in the following example is resolved to a table:
```scala
sql("CREATE TABLE testcat.ns.t (id bigint) USING foo")
sql("CREATE TEMPORARY VIEW t AS SELECT 2")
sql("USE testcat.ns")
sql("REFRESH TABLE t") // 't' is resolved to testcat.ns.t
```
whereas in v1, the `t` is resolved to a temp view:
```scala
sql("CREATE DATABASE test")
sql("CREATE TABLE spark_catalog.test.t (id bigint) USING csv")
sql("CREATE TEMPORARY VIEW t AS SELECT 2")
sql("USE spark_catalog.test")
sql("REFRESH TABLE t") // 't' is resolved to a temp view
```

### Does this PR introduce _any_ user-facing change?

After this PR, `REFRESH TABLE t` is resolved to a temp view `t` instead of `testcat.ns.t`.

### How was this patch tested?

Added a new test

Closes #29866 from imback82/refresh_table_consistent.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-25 04:29:09 +00:00
Chao Sun 8ccfbc114e [SPARK-32381][CORE][SQL] Move and refactor parallel listing & non-location sensitive listing to core
<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
  7. If you want to add a new configuration, please read the guideline first for naming configurations in
     'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->

This moves and refactors the parallel listing utilities from `InMemoryFileIndex` to Spark core so it can be reused by modules beside SQL. Along the process this also did some cleanups/refactorings:

- Created a `HadoopFSUtils` class under core
- Moved `InMemoryFileIndex.bulkListLeafFiles` into `HadoopFSUtils.parallelListLeafFiles`. It now depends on a `SparkContext` instead of `SparkSession` in SQL. Also added a few parameters which used to be read from `SparkSession.conf`: `ignoreMissingFiles`, `ignoreLocality`, `parallelismThreshold`, `parallelismMax ` and `filterFun` (for additional filtering support but we may be able to merge this with `filter` parameter in future).
- Moved `InMemoryFileIndex.listLeafFiles` into `HadoopFSUtils.listLeafFiles` with similar changes above.

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->

Currently the locality-aware parallel listing mechanism only applies to `InMemoryFileIndex`. By moving this to core, we can potentially reuse the same mechanism for other code paths as well.

### Does this PR introduce _any_ user-facing change?
<!--
Note that it means *any* user-facing change including all aspects such as the documentation fix.
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
If no, write 'No'.
-->

No.

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->

Since this is mostly a refactoring, it relies on existing unit tests such as those for `InMemoryFileIndex`.

Closes #29471 from sunchao/SPARK-32381.

Lead-authored-by: Chao Sun <sunchao@apache.org>
Co-authored-by: Holden Karau <hkarau@apple.com>
Co-authored-by: Chao Sun <sunchao@uber.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
2020-09-24 10:58:52 -07:00
Russell Spitzer b3f0087e39 [SPARK-32977][SQL][DOCS] Fix JavaDoc on Default Save Mode
### What changes were proposed in this pull request?

The default is always ErrorsOnExist regardless of DataSource version. Fixing the JavaDoc to reflect this.

### Why are the changes needed?

To fix documentation

### Does this PR introduce _any_ user-facing change?

Doc change.

### How was this patch tested?

Manual.

Closes #29853 from RussellSpitzer/SPARK-32977.

Authored-by: Russell Spitzer <russell.spitzer@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-09-23 20:02:20 -07:00
Michael Munday faeb71b39d [SPARK-32950][SQL] Remove unnecessary big-endian code paths
### What changes were proposed in this pull request?
Remove unnecessary code.

### Why are the changes needed?

General housekeeping. Might be a slight performance improvement, especially on big-endian systems.

There is no need for separate code paths for big- and little-endian
platforms in putDoubles and putFloats anymore (since PR #24861). On
all platforms values are encoded in native byte order and can just
be copied directly.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes #29815 from mundaym/clean-putfloats.

Authored-by: Michael Munday <mike.munday@ibm.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-09-23 12:38:06 -05:00
Terry Kim 21b7479797 [SPARK-32959][SQL][TEST] Fix an invalid test in DataSourceV2SQLSuite
### What changes were proposed in this pull request?

This PR addresses two issues related to the `Relation: view text` test in `DataSourceV2SQLSuite`.

1. The test has the following block:
```scala
withView("view1") { v1: String =>
  sql(...)
}
```
Since `withView`'s signature is `withView(v: String*)(f: => Unit): Unit`, the `f` that will be executed is ` v1: String => sql(..)`, which is just defining the anonymous function, and _not_ executing it.

2. Once the test is fixed to run, it actually fails. The reason is that the v2 session catalog implementation used in tests does not correctly handle `V1Table` for views in `loadTable`. And this results in views resolved to `ResolvedTable` instead of `ResolvedView`, causing the test failure: f1dc479d39/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala (L1007-L1011)

### Why are the changes needed?

Fixing a bug in test.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing test.

Closes #29811 from imback82/fix_minor_test.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-23 05:49:45 +00:00
tanel.kiis@gmail.com acfee3c8b1 [SPARK-32870][DOCS][SQL] Make sure that all expressions have their ExpressionDescription filled
### What changes were proposed in this pull request?

Made sure, that all the expressions in the `FunctionRegistry ` have the fields `usage`, `examples` and `since` filled in their `ExpressionDescription`. Added UT to `ExpressionInfoSuite`, to make sure, that all new expressions will also fill those fields.

### Why are the changes needed?

Documentation improvement

### Does this PR introduce _any_ user-facing change?

Better generated SQL built in functions documentation

### How was this patch tested?

Checked the fix version in the following jiras:
SPARK-1251 - UnaryMinus, Add, Subtract, Multiply, Divide, Remainder, Explode, Not, In, And, Or, Equals, LessThan, LessThanOrEqual, GreaterThan, GreaterThanOrEqual, If, Cast
SPARK-2053 - CaseWhen
SPARK-2665 - EqualNullSafe
SPARK-3176 - Abs
SPARK-6542 - CreateStruct
SPARK-7135 - MonotonicallyIncreasingID
SPARK-7152 - SparkPartitionID
SPARK-7295 - bitwiseAND, bitwiseOR, bitwiseXOR, bitwiseNOT
SPARK-8005 - InputFileName
SPARK-8203 - Greatest
SPARK-8204 - Least
SPARK-8220 - UnaryPositive
SPARK-8221 - Pmod
SPARK-8230 - Size
SPARK-8231 - ArrayContains
SPARK-8232 - SortArray
SPARK-8234 - md5
SPARK-8235 - sha1
SPARK-8236 - crc32
SPARK-8237 - sha2
SPARK-8240 - Concat
SPARK-8246 - GetJsonObject
SPARK-8407 - CreateNamedStruct
SPARK-9617 - JsonTuple
SPARK-10810 - CurrentDatabase
SPARK-12480 - Murmur3Hash
SPARK-14061 - CreateMap
SPARK-14160 - TimeWindow
SPARK-14580 - AssertTrue
SPARK-16274 - XPathBoolean
SPARK-16278 - MapKeys
SPARK-16279 - MapValues
SPARK-16284 - CallMethodViaReflection
SPARK-16286 - Stack
SPARK-16288 - Inline
SPARK-16289 - PosExplode
SPARK-16318 - XPathShort, XPathInt, XPathLong, XPathFloat, XPathDouble, XPathString, XPathList
SPARK-16730 - Cast aliases
SPARK-17495 - HiveHash
SPARK-18702 - InputFileBlockStart, InputFileBlockLength
SPARK-20910 - UUID

Closes #29743 from tanelk/SPARK-32870.

Authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-09-23 10:18:38 +09:00
Max Gekk b53da23a28 [MINOR][SQL] Improve examples for percentile_approx()
### What changes were proposed in this pull request?
In the PR, I propose to replace current examples for `percentile_approx()` with **only one** input value by example **with multiple values** in the input column.

### Why are the changes needed?
Current examples are pretty trivial, and don't demonstrate function's behaviour on a sequence of values.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- by running `ExpressionInfoSuite`
- `./dev/scalastyle`

Closes #29841 from MaxGekk/example-percentile_approx.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-23 09:41:38 +09:00
Max Gekk 7c14f177eb [SPARK-32306][SQL][DOCS] Clarify the result of percentile_approx()
### What changes were proposed in this pull request?
More precise description of the result of the `percentile_approx()` function and its synonym `approx_percentile()`. The proposed sentence clarifies that  the function returns **one of elements** (or array of elements) from the input column.

### Why are the changes needed?
To improve Spark docs and avoid misunderstanding of the function behavior.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
`./dev/scalastyle`

Closes #29835 from MaxGekk/doc-percentile_approx.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2020-09-22 12:45:19 -07:00
Wenchen Fan fba5736c50 [SPARK-32757][SQL][FOLLOWUP] Preserve the attribute name as possible as we scan in SubqueryBroadcastExec
### What changes were proposed in this pull request?

This is a minor followup of https://github.com/apache/spark/pull/29601 , to preserve the attribute name in `SubqueryBroadcastExec.output`.

### Why are the changes needed?

During explain, it's better to see the origin column name instead of always "key".

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests.

Closes #29839 from cloud-fan/followup2.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-09-22 11:05:35 -07:00
Wenchen Fan 6145621495 [SPARK-32659][SQL][FOLLOWUP] Broadcast Array instead of Set in InSubqueryExec
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/29475.

This PR updates the code to broadcast the Array instead of Set, which was the behavior before #29475

### Why are the changes needed?

The size of Set can be much bigger than Array. It's safer to keep the behavior the same as before and build the set at the executor side.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing tests

Closes #29838 from cloud-fan/followup.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-09-22 08:49:58 -07:00
Peter Toth f03c03576a [SPARK-32951][SQL] Foldable propagation from Aggregate
### What changes were proposed in this pull request?
This PR adds foldable propagation from `Aggregate` as per: https://github.com/apache/spark/pull/29771#discussion_r490412031

### Why are the changes needed?
This is an improvement as `Aggregate`'s `aggregateExpressions` can contain foldables that can be propagated up.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
New UT.

Closes #29816 from peter-toth/SPARK-32951-foldable-propagation-from-aggregate.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-09-21 21:43:17 -07:00
zero323 7fb9f6884f [SPARK-32799][R][SQL] Add allowMissingColumns to SparkR unionByName
### What changes were proposed in this pull request?

Add optional `allowMissingColumns` argument to SparkR `unionByName`.

### Why are the changes needed?

Feature parity.

### Does this PR introduce _any_ user-facing change?

`unionByName` supports `allowMissingColumns`.

### How was this patch tested?

Existing unit tests. New unit tests targeting this feature.

Closes #29813 from zero323/SPARK-32799.

Authored-by: zero323 <mszymkiewicz@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-21 09:39:34 +09:00
yangjie01 2128c4f14b [SPARK-32808][SQL] Pass all test of sql/core module in Scala 2.13
### What changes were proposed in this pull request?

After https://github.com/apache/spark/pull/29660 and https://github.com/apache/spark/pull/29689 there are 13 remaining  failed cases of sql core module with Scala 2.13.

The reason for the remaining failed cases is the optimization result of `CostBasedJoinReorder` maybe different with same input in Scala 2.12 and Scala 2.13 if there are more than one same cost candidate plans.

In this pr give a way to make the  optimization result deterministic as much as possible to pass all remaining failed cases of `sql/core` module in Scala 2.13, the main change of this pr as follow:

- Change to use `LinkedHashMap` instead of `Map` to store `foundPlans` in `JoinReorderDP.search` method to ensure same iteration order with same insert order because iteration order of `Map` behave differently under Scala 2.12 and 2.13

- Fixed `StarJoinCostBasedReorderSuite` affected by the above change

- Regenerate golden files affected by the above change.

### Why are the changes needed?
We need to support a Scala 2.13 build.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Scala 2.12: Pass the Jenkins or GitHub Action

- Scala 2.13: All tests passed.

Do the following:

```
dev/change-scala-version.sh 2.13
mvn clean install -DskipTests  -pl sql/core -Pscala-2.13 -am
mvn test -pl sql/core -Pscala-2.13
```

**Before**
```
Tests: succeeded 8485, failed 13, canceled 1, ignored 52, pending 0
*** 13 TESTS FAILED ***

```

**After**

```
Tests: succeeded 8498, failed 0, canceled 1, ignored 52, pending 0
All tests passed.
```

Closes #29711 from LuciferYang/SPARK-32808-3.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-09-18 10:38:30 -05:00
gengjiaan 8b09536cdf [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
### What changes were proposed in this pull request?
The `NTH_VALUE` function is an ANSI SQL.
For examples:
```
CREATE TEMPORARY TABLE empsalary (
    depname varchar,
    empno bigint,
    salary int,
    enroll_date date
);

INSERT INTO empsalary VALUES
('develop', 10, 5200, '2007-08-01'),
('sales', 1, 5000, '2006-10-01'),
('personnel', 5, 3500, '2007-12-10'),
('sales', 4, 4800, '2007-08-08'),
('personnel', 2, 3900, '2006-12-23'),
('develop', 7, 4200, '2008-01-01'),
('develop', 9, 4500, '2008-01-01'),
('sales', 3, 4800, '2007-08-01'),
('develop', 8, 6000, '2006-10-01'),
('develop', 11, 5200, '2007-08-15');

select first_value(salary) over(order by salary range between 1000 preceding and 1000 following),
	lead(salary) over(order by salary range between 1000 preceding and 1000 following),
	nth_value(salary, 1) over(order by salary range between 1000 preceding and 1000 following),
	salary from empsalary;
 first_value | lead | nth_value | salary
-------------+------+-----------+--------
        3500 | 3900 |      3500 |   3500
        3500 | 4200 |      3500 |   3900
        3500 | 4500 |      3500 |   4200
        3500 | 4800 |      3500 |   4500
        3900 | 4800 |      3900 |   4800
        3900 | 5000 |      3900 |   4800
        4200 | 5200 |      4200 |   5000
        4200 | 5200 |      4200 |   5200
        4200 | 6000 |      4200 |   5200
        5000 |      |      5000 |   6000
(10 rows)
```

There are some mainstream database support the syntax.

**PostgreSQL:**
https://www.postgresql.org/docs/8.4/functions-window.html

**Vertica:**
https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Functions/Analytic/NTH_VALUEAnalytic.htm?tocpath=SQL%20Reference%20Manual%7CSQL%20Functions%7CAnalytic%20Functions%7C_____23

**Oracle:**
https://docs.oracle.com/en/database/oracle/oracle-database/19/sqlrf/NTH_VALUE.html#GUID-F8A0E88C-67E5-4AA6-9515-95D03A7F9EA0

**Redshift**
https://docs.aws.amazon.com/redshift/latest/dg/r_WF_NTH.html

**Presto**
https://prestodb.io/docs/current/functions/window.html

**MySQL**
https://www.mysqltutorial.org/mysql-window-functions/mysql-nth_value-function/

### Why are the changes needed?
The `NTH_VALUE` function is an ANSI SQL.
The `NTH_VALUE` function is very useful.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Exists and new UT.

Closes #29604 from beliefer/support-nth_value.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: beliefer <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-18 07:06:38 +00:00
Takeshi Yamamuro b49aaa33e1 [SPARK-32906][SQL] Struct field names should not change after normalizing floats
### What changes were proposed in this pull request?

This PR intends to fix a minor bug when normalizing floats for struct types;
```
scala> import org.apache.spark.sql.execution.aggregate.HashAggregateExec
scala> val df = Seq(Tuple1(Tuple1(-0.0d)), Tuple1(Tuple1(0.0d))).toDF("k")
scala> val agg = df.distinct()
scala> agg.explain()
== Physical Plan ==
*(2) HashAggregate(keys=[k#40], functions=[])
+- Exchange hashpartitioning(k#40, 200), true, [id=#62]
   +- *(1) HashAggregate(keys=[knownfloatingpointnormalized(if (isnull(k#40)) null else named_struct(col1, knownfloatingpointnormalized(normalizenanandzero(k#40._1)))) AS k#40], functions=[])
      +- *(1) LocalTableScan [k#40]

scala> val aggOutput = agg.queryExecution.sparkPlan.collect { case a: HashAggregateExec => a.output.head }
scala> aggOutput.foreach { attr => println(attr.prettyJson) }
### Final Aggregate ###
[ {
  "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
  "num-children" : 0,
  "name" : "k",
  "dataType" : {
    "type" : "struct",
    "fields" : [ {
      "name" : "_1",
                ^^^
      "type" : "double",
      "nullable" : false,
      "metadata" : { }
    } ]
  },
  "nullable" : true,
  "metadata" : { },
  "exprId" : {
    "product-class" : "org.apache.spark.sql.catalyst.expressions.ExprId",
    "id" : 40,
    "jvmId" : "a824e83f-933e-4b85-a1ff-577b5a0e2366"
  },
  "qualifier" : [ ]
} ]

### Partial Aggregate ###
[ {
  "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
  "num-children" : 0,
  "name" : "k",
  "dataType" : {
    "type" : "struct",
    "fields" : [ {
      "name" : "col1",
                ^^^^
      "type" : "double",
      "nullable" : true,
      "metadata" : { }
    } ]
  },
  "nullable" : true,
  "metadata" : { },
  "exprId" : {
    "product-class" : "org.apache.spark.sql.catalyst.expressions.ExprId",
    "id" : 40,
    "jvmId" : "a824e83f-933e-4b85-a1ff-577b5a0e2366"
  },
  "qualifier" : [ ]
} ]
```

### Why are the changes needed?

bugfix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added tests.

Closes #29780 from maropu/FixBugInNormalizedFloatingNumbers.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2020-09-17 22:07:47 -07:00
Max Gekk 75dd86400c [SPARK-32908][SQL] Fix target error calculation in percentile_approx()
### What changes were proposed in this pull request?
1. Change the target error calculation according to the paper [Space-Efficient Online Computation of Quantile Summaries](http://infolab.stanford.edu/~datar/courses/cs361a/papers/quantiles.pdf). It says that the error `e = max(gi, deltai)/2` (see the page 59). Also this has clear explanation [ε-approximate quantiles](http://www.mathcs.emory.edu/~cheung/Courses/584/Syllabus/08-Quantile/Greenwald.html#proofprop1).
2. Added a test to check different accuracies.
3. Added an input CSV file `percentile_approx-input.csv.bz2` to the resource folder `sql/catalyst/src/main/resources` for the test.

### Why are the changes needed?
To fix incorrect percentile calculation, see an example in SPARK-32908.

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
- By running existing tests in `QuantileSummariesSuite` and in `ApproximatePercentileQuerySuite`.
- Added new test `SPARK-32908: maximum target error in percentile_approx` to `ApproximatePercentileQuerySuite`.

Closes #29784 from MaxGekk/fix-percentile_approx-2.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-18 10:47:06 +09:00
Takeshi Yamamuro 68e0d5f296 [SPARK-32902][SQL] Logging plan changes for AQE
### What changes were proposed in this pull request?

Recently, we added code to log plan changes in the preparation phase in `QueryExecution` for execution (https://github.com/apache/spark/pull/29544). This PR intends to apply the same fix  for logging plan changes in AQE.

### Why are the changes needed?

Easy debugging for AQE plans

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit tests.

Closes #29774 from maropu/PlanChangeLogForAQE.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-09-18 08:29:29 +09:00
Peter Toth 4ced58862c [SPARK-32635][SQL] Fix foldable propagation
### What changes were proposed in this pull request?
This PR rewrites `FoldablePropagation` rule to replace attribute references in a node with foldables coming only from the node's children.

Before this PR in the case of this example (with setting`spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation`):
```scala
val a = Seq("1").toDF("col1").withColumn("col2", lit("1"))
val b = Seq("2").toDF("col1").withColumn("col2", lit("2"))
val aub = a.union(b)
val c = aub.filter($"col1" === "2").cache()
val d = Seq("2").toDF( "col4")
val r = d.join(aub, $"col2" === $"col4").select("col4")
val l = c.select("col2")
val df = l.join(r, $"col2" === $"col4", "LeftOuter")
df.show()
```
foldable propagation happens incorrectly:
```
 Join LeftOuter, (col2#6 = col4#34)                                                              Join LeftOuter, (col2#6 = col4#34)
!:- Project [col2#6]                                                                             :- Project [1 AS col2#6]
 :  +- InMemoryRelation [col1#4, col2#6], StorageLevel(disk, memory, deserialized, 1 replicas)   :  +- InMemoryRelation [col1#4, col2#6], StorageLevel(disk, memory, deserialized, 1 replicas)
 :        +- Union                                                                               :        +- Union
 :           :- *(1) Project [value#1 AS col1#4, 1 AS col2#6]                                    :           :- *(1) Project [value#1 AS col1#4, 1 AS col2#6]
 :           :  +- *(1) Filter (isnotnull(value#1) AND (value#1 = 2))                            :           :  +- *(1) Filter (isnotnull(value#1) AND (value#1 = 2))
 :           :     +- *(1) LocalTableScan [value#1]                                              :           :     +- *(1) LocalTableScan [value#1]
 :           +- *(2) Project [value#10 AS col1#13, 2 AS col2#15]                                 :           +- *(2) Project [value#10 AS col1#13, 2 AS col2#15]
 :              +- *(2) Filter (isnotnull(value#10) AND (value#10 = 2))                          :              +- *(2) Filter (isnotnull(value#10) AND (value#10 = 2))
 :                 +- *(2) LocalTableScan [value#10]                                             :                 +- *(2) LocalTableScan [value#10]
 +- Project [col4#34]                                                                            +- Project [col4#34]
    +- Join Inner, (col2#6 = col4#34)                                                               +- Join Inner, (col2#6 = col4#34)
       :- Project [value#31 AS col4#34]                                                                :- Project [value#31 AS col4#34]
       :  +- LocalRelation [value#31]                                                                  :  +- LocalRelation [value#31]
       +- Project [col2#6]                                                                             +- Project [col2#6]
          +- Union false, false                                                                           +- Union false, false
             :- Project [1 AS col2#6]                                                                        :- Project [1 AS col2#6]
             :  +- LocalRelation [value#1]                                                                   :  +- LocalRelation [value#1]
             +- Project [2 AS col2#15]                                                                       +- Project [2 AS col2#15]
                +- LocalRelation [value#10]                                                                     +- LocalRelation [value#10]

```
and so the result is wrong:
```
+----+----+
|col2|col4|
+----+----+
|   1|null|
+----+----+
```

After this PR foldable propagation will not happen incorrectly and the result is correct:
```
+----+----+
|col2|col4|
+----+----+
|   2|   2|
+----+----+
```

### Why are the changes needed?
To fix a correctness issue.

### Does this PR introduce _any_ user-facing change?
Yes, fixes a correctness issue.

### How was this patch tested?
Existing and new UTs.

Closes #29771 from peter-toth/SPARK-32635-fix-foldable-propagation.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-09-18 08:17:23 +09:00
jzc ea3b979e95 [SPARK-32889][SQL] orc table column name supports special characters
### What changes were proposed in this pull request?
make orc table column name support special characters like `$`

### Why are the changes needed?
Special characters like `$` are allowed in orc table column name by Hive.
But it's error when execute command "CREATE TABLE tbl(`$` INT, b INT) using orc" in spark. it's not compatible with Hive.

`Column name "$" contains invalid character(s). Please use alias to rename it.;Column name "$" contains invalid character(s). Please use alias to rename it.;org.apache.spark.sql.AnalysisException: Column name "$" contains invalid character(s). Please use alias to rename it.;
at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$.checkFieldName(OrcFileFormat.scala:51)
at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$.$anonfun$checkFieldNames$1(OrcFileFormat.scala:59)
at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$.$anonfun$checkFieldNames$1$adapted(OrcFileFormat.scala:59)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38) `

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add unit test

Closes #29761 from jzc928/orcColSpecialChar.

Authored-by: jzc <jzc@jzcMacBookPro.local>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-09-17 14:50:47 -07:00
yangjie01 5817c584b8 [SPARK-32909][SQL] Pass all sql/hive-thriftserver module UTs in Scala 2.13
### What changes were proposed in this pull request?

This pr fix failed and aborted cases in sql hive-thriftserver module in Scala 2.13, the main change of this pr as follow:

- Use `s.c.Seq` instead of `Seq` in `HiveResult` because the input type maybe `mutable.ArraySeq`, but `Seq` represent `immutable.Seq` in Scala 2.13.

- Reset classLoader after `HiveMetastoreLazyInitializationSuite` completed because context class loader is `NonClosableMutableURLClassLoader`  in `HiveMetastoreLazyInitializationSuite` running process, and it propagate to `HiveThriftServer2ListenerSuite` trigger following problems in Scala 2.13:

```
HiveThriftServer2ListenerSuite:
*** RUN ABORTED ***
  java.lang.LinkageError: loader constraint violation: loader (instance of net/bytebuddy/dynamic/loading/MultipleParentClassLoader) previously initiated loading for a different type with name "org/apache/hive/service/ServiceStateChangeListener"
  at org.mockito.codegen.HiveThriftServer2$MockitoMock$1850222569.<clinit>(Unknown Source)
  at sun.reflect.GeneratedSerializationConstructorAccessor530.newInstance(Unknown Source)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:48)
  at org.objenesis.ObjenesisBase.newInstance(ObjenesisBase.java:73)
  at org.mockito.internal.creation.instance.ObjenesisInstantiator.newInstance(ObjenesisInstantiator.java:19)
  at org.mockito.internal.creation.bytebuddy.SubclassByteBuddyMockMaker.createMock(SubclassByteBuddyMockMaker.java:47)
  at org.mockito.internal.creation.bytebuddy.ByteBuddyMockMaker.createMock(ByteBuddyMockMaker.java:25)
  at org.mockito.internal.util.MockUtil.createMock(MockUtil.java:35)
  at org.mockito.internal.MockitoCore.mock(MockitoCore.java:63)
  ...
```

After this pr `HiveThriftServer2Suites` and `HiveThriftServer2ListenerSuite` was fixed and all 461 test passed

### Why are the changes needed?
We need to support a Scala 2.13 build.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Scala 2.12: Pass the Jenkins or GitHub Action

- Scala 2.13: All tests passed.

Do the following:

```
dev/change-scala-version.sh 2.13
mvn clean install -DskipTests -pl sql/hive-thriftserver -am -Phive-thriftserver -Pscala-2.13
mvn test -pl sql/hive-thriftserver -Phive -Phive-thriftserver -Pscala-2.13
```

**Before**

```
HiveThriftServer2ListenerSuite:
*** RUN ABORTED ***
```

**After**

```
Tests: succeeded 461, failed 0, canceled 0, ignored 17, pending 0
All tests passed.
```

Closes #29783 from LuciferYang/sql-thriftserver-tests.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-09-17 14:35:01 -07:00
Chao Sun 482a79a5e3 [SPARK-24994][SQL][FOLLOW-UP] Handle foldable, timezone and cleanup
### What changes were proposed in this pull request?

This is a follow-up on #29565, and addresses a few issues in the last PR:
- style issue pointed by [this comment](https://github.com/apache/spark/pull/29565#discussion_r487646749)
- skip optimization when `fromExp` is foldable (by [this comment](https://github.com/apache/spark/pull/29565#discussion_r487646973)) as there could be more efficient rule to apply for this case.
- pass timezone info to the generated cast on the literal value
- a bunch of cleanups and test improvements

Originally I plan to handle this when implementing [SPARK-32858](https://issues.apache.org/jira/browse/SPARK-32858) but now think it's better to isolate these changes from that.

### Why are the changes needed?

To fix a few left over issues in the above PR.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a test for the foldable case. Otherwise relying on existing tests.

Closes #29775 from sunchao/SPARK-24994-followup.

Authored-by: Chao Sun <sunchao@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-09-17 07:50:39 -07:00
sychen 92b75dc260 [SPARK-32508][SQL] Disallow empty part col values in partition spec before static partition writing
### What changes were proposed in this pull request?
Write to static partition, check in advance that the partition field is empty.

### Why are the changes needed?
When writing to the current static partition, the partition field is empty, and an error will be reported when all tasks are completed.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
add ut

Closes #29316 from cxzl25/SPARK-32508.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-17 06:50:30 +00:00
Jungtaek Lim (HeartSaVioR) d936cb328d [SPARK-26425][SS] Add more constraint checks to avoid checkpoint corruption
### What changes were proposed in this pull request?

Credits to tdas who reported and described the fix to [SPARK-26425](https://issues.apache.org/jira/browse/SPARK-26425). I just followed the description of the issue.

This patch adds more checks on commit log as well as file streaming source so that multiple concurrent runs of streaming query don't mess up the status of query/checkpoint. This patch addresses two different spots which are having a bit different issues:

1. FileStreamSource.fetchMaxOffset()

In structured streaming, we don't allow multiple streaming queries to run with same checkpoint (including concurrent runs of same query), so query should fail if it fails to write the metadata of specific batch ID due to same batch ID being written by others.

2. commit log

As described in JIRA issue, assertion is already applied to the `offsetLog` for the same reason.

8167714cab/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala (L394-L402)

This patch applied the same for commit log.

### Why are the changes needed?

This prevents the inconsistent behavior on streaming query and lets query fail instead.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

N/A, as the change is simple and obvious, and it's really hard to artificially reproduce the issue.

Closes #25965 from HeartSaVioR/SPARK-26425.

Lead-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
2020-09-17 09:01:06 +09:00
Linhong Liu 40ef5c91ad [SPARK-32816][SQL] Fix analyzer bug when aggregating multiple distinct DECIMAL columns
### What changes were proposed in this pull request?
This PR fixes a conflict between `RewriteDistinctAggregates` and `DecimalAggregates`.
In some cases, `DecimalAggregates` will wrap the decimal column to `UnscaledValue` using
different rules for different aggregates.

This means, same distinct column with different aggregates will change to different distinct columns
after `DecimalAggregates`. For example:
`avg(distinct decimal_col), sum(distinct decimal_col)` may change to
`avg(distinct UnscaledValue(decimal_col)), sum(distinct decimal_col)`

We assume after `RewriteDistinctAggregates`, there will be at most one distinct column in aggregates,
but `DecimalAggregates` breaks this assumption. To fix this, we have to switch the order of these two
rules.

### Why are the changes needed?
bug fix

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
added test cases

Closes #29673 from linhongliu-db/SPARK-32816.

Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-16 16:53:25 +00:00
Liang-Chi Hsieh 550c1c9cfb [SPARK-32888][DOCS] Add user document about header flag and RDD as path for reading CSV
### What changes were proposed in this pull request?

This proposes to enhance user document of the API for loading a Dataset of strings storing CSV rows. If the header option is set to true, the API will remove all lines same with the header.

### Why are the changes needed?

This behavior can confuse users. We should explicitly document it.

### Does this PR introduce _any_ user-facing change?

No. Only doc change.

### How was this patch tested?

Only doc change.

Closes #29765 from viirya/SPARK-32888.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-16 20:16:15 +09:00
allisonwang-db 2e3aa2f023 [SPARK-32861][SQL] GenerateExec should require column ordering
### What changes were proposed in this pull request?
This PR updates the `RemoveRedundantProjects` rule to make `GenerateExec` require column ordering.

### Why are the changes needed?
`GenerateExec` was originally considered as a node that does not require column ordering. However, `GenerateExec` binds its input rows directly with its `requiredChildOutput` without using the child's output schema.
In `doExecute()`:
```scala
val proj = UnsafeProjection.create(output, output)
```
In `doConsume()`:
```scala
val values = if (requiredChildOutput.nonEmpty) {
  input
} else {
  Seq.empty
}
```
In this case, changing input column ordering will result in `GenerateExec` binding the wrong schema to the input columns. For example, if we do not require child columns to be ordered, the `requiredChildOutput` [a, b, c] will directly bind to the schema of the input columns [c, b, a], which is incorrect:
```
GenerateExec explode(array(a, b, c)), [a, b, c], false, [d]
  HashAggregate(keys=[a, b, c], functions=[], output=[c, b, a])
    ...
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

Closes #29734 from allisonwang-db/generator.

Authored-by: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-16 06:05:35 +00:00
HyukjinKwon b46c7302db [SPARK-32704][SQL][TESTS][FOLLOW-UP] Check any physical rule instead of a specific rule in the test
### What changes were proposed in this pull request?

This PR only checks if there's any physical rule runs instead of a specific rule. This is rather just a trivial fix to make the tests more robust.

In fact, I faced a test failure from a in-house fork that applies a different physical rule that makes `CollapseCodegenStages` ineffective.

### Why are the changes needed?

To make the test more robust by unrelated changes.

### Does this PR introduce _any_ user-facing change?

No, test-only

### How was this patch tested?

Manually tested. Jenkins tests should pass.

Closes #29766 from HyukjinKwon/SPARK-32704.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-16 12:06:57 +09:00
HyukjinKwon 108c4c8fdc [SPARK-32481][SQL][TESTS][FOLLOW-UP] Skip the test if trash directory cannot be created
### What changes were proposed in this pull request?

This PR skips the test if trash directory cannot be created. It is possible that the trash directory cannot be created, for example, by permission. And the test fails below:

```
- SPARK-32481 Move data to trash on truncate table if enabled *** FAILED *** (154 milliseconds)
  fs.exists(trashPath) was false (DDLSuite.scala:3184)
  org.scalatest.exceptions.TestFailedException:
  at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
  at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
  at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
  at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:503)
```

### Why are the changes needed?

To make the tests pass independently.

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Manually tested.

Closes #29759 from HyukjinKwon/SPARK-32481.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-16 08:11:43 +09:00
ulysses 888b343587 [SPARK-32827][SQL] Add spark.sql.maxMetadataStringLength config
### What changes were proposed in this pull request?

Add a new config `spark.sql.maxMetadataStringLength`. This config aims to limit metadata value length, e.g. file location.

### Why are the changes needed?

Some metadata have been abbreviated by `...` when I tried to add some test in `SQLQueryTestSuite`. We need to replace such value to `notIncludedMsg`. That caused we can't replace that like location value by `className` since the `className` has been abbreviated.

Here is a case:
```
CREATE table  explain_temp1 (key int, val int) USING PARQUET;

EXPLAIN EXTENDED SELECT sum(distinct val) FROM explain_temp1;

-- ignore parsed,analyzed,optimized
-- The output like
== Physical Plan ==
*HashAggregate(keys=[], functions=[sum(distinct cast(val#x as bigint)#xL)], output=[sum(DISTINCT val)#xL])
+- Exchange SinglePartition, true, [id=#x]
   +- *HashAggregate(keys=[], functions=[partial_sum(distinct cast(val#x as bigint)#xL)], output=[sum#xL])
      +- *HashAggregate(keys=[cast(val#x as bigint)#xL], functions=[], output=[cast(val#x as bigint)#xL])
         +- Exchange hashpartitioning(cast(val#x as bigint)#xL, 4), true, [id=#x]
            +- *HashAggregate(keys=[cast(val#x as bigint) AS cast(val#x as bigint)#xL], functions=[], output=[cast(val#x as bigint)#xL])
               +- *ColumnarToRow
                  +- FileScan parquet default.explain_temp1[val#x] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/runner/work/spark/spark/sql/core/spark-warehouse/org.apache.spark.sq...], PartitionFilters: ...
```

### Does this PR introduce _any_ user-facing change?

No, a new config.

### How was this patch tested?

new test.

Closes #29688 from ulysses-you/SPARK-32827.

Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-15 14:11:30 +00:00
herman c8baab1a1f [SPARK-32879][SQL] Refactor SparkSession initial options
### What changes were proposed in this pull request?
This PR refactors the way we propagate the options from the `SparkSession.Builder` to the` SessionState`. This currently done via a mutable map inside the SparkSession. These setting settings are then applied **after** the Session. This is a bit confusing when you expect something to be set when constructing the `SessionState`. This PR passes the options as a constructor parameter to the `SessionStateBuilder` and this will set the options when the configuration is created.

### Why are the changes needed?
It makes it easier to reason about the configurations set in a SessionState than before. We recently had an incident where someone was using `SparkSessionExtensions` to create a planner rule that relied on a conf to be set. While this is in itself probably incorrect usage, it still illustrated this somewhat funky behavior.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests.

Closes #29752 from hvanhovell/SPARK-32879.

Authored-by: herman <herman@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-15 06:24:54 +00:00
Dongjoon Hyun d8a0d85692 [SPARK-32884][TESTS] Mark TPCDSQuery*Suite as ExtendedSQLTest
### What changes were proposed in this pull request?

This PR aims to mark the following suite as `ExtendedSQLTest` to reduce GitHub Action test time.
- TPCDSQuerySuite
- TPCDSQueryANSISuite
- TPCDSQueryWithStatsSuite

### Why are the changes needed?

Currently, the longest GitHub Action task is `Build and test / Build modules: sql - other tests` with `1h 57m 10s` while `Build and test / Build modules: sql - slow tests` takes `42m 20s`. With this PR, we can move the workload from `other tests` to `slow tests` task and reduce the total waiting time about 7 ~ 8 minutes.

### Does this PR introduce _any_ user-facing change?

No. This is a test-only change.

### How was this patch tested?

Pass the GitHub Action with the reduced running time.

Closes #29755 from dongjoon-hyun/SPARK-SLOWTEST.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-15 14:38:01 +09:00
Kousuke Saruta 4fac6d501a [SPARK-32871][BUILD] Append toMap to Map#filterKeys if the result of filter is concatenated with another Map for Scala 2.13
### What changes were proposed in this pull request?

This PR appends `toMap` to `Map` instances with `filterKeys` if such maps is to be concatenated with another maps.

### Why are the changes needed?

As of Scala 2.13, Map#filterKeys return a MapView, not the original Map type.
This can cause compile error.
```
/sql/DataFrameReader.scala:279: type mismatch;
[error]  found   : Iterable[(String, String)]
[error]  required: java.util.Map[String,String]
[error] Error occurred in an application involving default arguments.
[error]       val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Compile passed with the following command.
`build/mvn -Pscala-2.13 -Phive -Phive-thriftserver -Pyarn -Pkubernetes -DskipTests test-compile`

Closes #29742 from sarutak/fix-filterKeys-issue.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-15 09:27:47 +09:00
Yuanjian Li 5e825482d7 [SPARK-32844][SQL] Make DataFrameReader.table take the specified options for datasource v1
### What changes were proposed in this pull request?
Make `DataFrameReader.table` take the specified options for datasource v1.

### Why are the changes needed?
Keep the same behavior of v1/v2 datasource, the v2 fix has been done in SPARK-32592.

### Does this PR introduce _any_ user-facing change?
Yes. The DataFrameReader.table will take the specified options. Also, if there are the same key and value exists in specified options and table properties, an exception will be thrown.

### How was this patch tested?
New UT added.

Closes #29712 from xuanyuanking/SPARK-32844.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-14 09:20:24 +00:00
Cheng Su 978f531010 [SPARK-32854][SS] Minor code and doc improvement for stream-stream join
### What changes were proposed in this pull request?

Several minor code and documentation improvement for stream-stream join. Specifically:

* Remove extending from `SparkPlan`, as extending from `BinaryExecNode` is enough.
* Return `left/right.outputPartitioning` for `Left/RightOuter` in `outputPartitioning`, as the `PartitioningCollection` wrapper is unnecessary (similar to batch joins `ShuffledHashJoinExec`, `SortMergeJoinExec`).
*  Avoid per-row check for join type (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L486-L492), by creating the method before the loop of reading rows (`generateFilteredJoinedRow` in `storeAndJoinWithOtherSide`). Similar optimization (i.e. create auxiliary method/variable per different join type before the iterator of input rows) has been done in batch join world (`SortMergeJoinExec`, `ShuffledHashJoinExec`).
* Minor fix for comment/indentation for better readability.

### Why are the changes needed?

Minor optimization to avoid per-row unnecessary work (this probably can be optimized away by compiler, but we can do a better join to avoid it at the first place). And other comment/indentation fix to have better code readability for future developers.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests in `StreamingJoinSuite.scala` as no new logic is introduced.

Closes #29724 from c21/streaming.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-14 08:49:51 +00:00
Kousuke Saruta b121f0d459 [SPARK-32873][BUILD] Fix code which causes error when build with sbt and Scala 2.13
### What changes were proposed in this pull request?

This PR fix code which causes error when build with sbt and Scala 2.13 like as follows.
```
[error] [warn] /home/kou/work/oss/spark-scala-2.13/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala:251: method with a single empty parameter list overrides method without any parameter list
[error] [warn]   override def hasNext(): Boolean = requestOffset < part.untilOffset
[error] [warn]
[error] [warn] /home/kou/work/oss/spark-scala-2.13/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala:294: method with a single empty parameter list overrides method without any parameter list
[error] [warn]   override def hasNext(): Boolean = okNext
```

More specifically, what this PR fixes are

* Methods which has an empty parameter list and overrides an method which has no parameter list.
```
override def hasNext(): Boolean = okNext
```

* Methods which has no parameter list and overrides an method which has an empty parameter list.
```
      override def next: (Int, Double) = {
```

* Infix operator expression that the operator wraps.
```
    3L * math.min(k, numFeatures) * math.min(k, numFeatures)
    3L * math.min(k, numFeatures) * math.min(k, numFeatures) +
    + math.max(math.max(k, numFeatures), 4L * math.min(k, numFeatures)
      math.max(math.max(k, numFeatures), 4L * math.min(k, numFeatures) *
    * math.min(k, numFeatures) + 4L * math.min(k, numFeatures))
```

### Why are the changes needed?

For building Spark with sbt and Scala 2.13.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

After this change and #29742 applied, compile passed with the following command.
```
build/sbt -Pscala-2.13  -Phive -Phive-thriftserver -Pyarn -Pkubernetes compile test:compile
```

Closes #29745 from sarutak/fix-code-for-sbt-and-spark-2.13.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-14 15:34:58 +09:00
Chao Sun a6d6ea3efe [SPARK-32802][SQL] Avoid using SpecificInternalRow in RunLengthEncoding#Encoder
### What changes were proposed in this pull request?

Currently `RunLengthEncoding#Encoder` uses `SpecificInternalRow` as a holder for the current value when calculating compression stats and doing the actual compression. It calls `ColumnType.copyField` and `ColumnType.getField` on the internal row which incurs extra cost comparing to directly operating on the internal type. This proposes to replace the `SpecificInternalRow` with `T#InternalType` to avoid the extra cost.

### Why are the changes needed?

Operating on `SpecificInternalRow` carries certain cost and negatively impact performance when using `RunLengthEncoding` for compression.

With the change I see some improvements through `CompressionSchemeBenchmark`:

```diff
 Intel(R) Core(TM) i9-9880H CPU  2.30GHz
 BOOLEAN Encode:                           Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-PassThrough(1.000)                                    1              1           0      51957.0           0.0       1.0X
-RunLengthEncoding(2.502)                            549            555           9        122.2           8.2       0.0X
-BooleanBitSet(0.125)                                296            301           3        226.6           4.4       0.0X
+PassThrough(1.000)                                    2              2           0      42985.4           0.0       1.0X
+RunLengthEncoding(2.517)                            487            500          10        137.7           7.3       0.0X
+BooleanBitSet(0.125)                                348            353           4        192.8           5.2       0.0X

 OpenJDK 64-Bit Server VM 11.0.8+10-LTS on Mac OS X 10.15.5
 Intel(R) Core(TM) i9-9880H CPU  2.30GHz
 SHORT Encode (Lower Skew):                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-PassThrough(1.000)                                    3              3           0      22779.9           0.0       1.0X
-RunLengthEncoding(1.520)                           1186           1192           9         56.6          17.7       0.0X
+PassThrough(1.000)                                    3              4           0      21216.6           0.0       1.0X
+RunLengthEncoding(1.493)                            882            931          50         76.1          13.1       0.0X

 OpenJDK 64-Bit Server VM 11.0.8+10-LTS on Mac OS X 10.15.5
 Intel(R) Core(TM) i9-9880H CPU  2.30GHz
 SHORT Encode (Higher Skew):               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-PassThrough(1.000)                                    3              4           0      21352.2           0.0       1.0X
-RunLengthEncoding(2.009)                           1173           1175           3         57.2          17.5       0.0X
+PassThrough(1.000)                                    3              3           0      22388.6           0.0       1.0X
+RunLengthEncoding(2.015)                            924            941          23         72.6          13.8       0.0X

 OpenJDK 64-Bit Server VM 11.0.8+10-LTS on Mac OS X 10.15.5
 Intel(R) Core(TM) i9-9880H CPU  2.30GHz
 INT Encode (Lower Skew):                  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-PassThrough(1.000)                                    9             10           1       7410.1           0.1       1.0X
-RunLengthEncoding(1.000)                           1499           1502           4         44.8          22.3       0.0X
-DictionaryEncoding(0.500)                           621            630          11        108.0           9.3       0.0X
-IntDelta(0.250)                                     134            149          10        502.0           2.0       0.1X
+PassThrough(1.000)                                    9             10           1       7575.9           0.1       1.0X
+RunLengthEncoding(1.002)                            952            966          12         70.5          14.2       0.0X
+DictionaryEncoding(0.500)                           561            567           6        119.7           8.4       0.0X
+IntDelta(0.250)                                     129            134           3        521.9           1.9       0.1X

 OpenJDK 64-Bit Server VM 11.0.8+10-LTS on Mac OS X 10.15.5
 Intel(R) Core(TM) i9-9880H CPU  2.30GHz
 INT Encode (Higher Skew):                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-PassThrough(1.000)                                    9             10           1       7668.3           0.1       1.0X
-RunLengthEncoding(1.332)                           1561           1685         175         43.0          23.3       0.0X
-DictionaryEncoding(0.501)                           616            642          21        108.9           9.2       0.0X
-IntDelta(0.250)                                     126            131           2        533.4           1.9       0.1X
+PassThrough(1.000)                                    9             10           1       7494.1           0.1       1.0X
+RunLengthEncoding(1.336)                            974            987          13         68.9          14.5       0.0X
+DictionaryEncoding(0.501)                           709            719          10         94.6          10.6       0.0X
+IntDelta(0.250)                                     127            132           4        528.4           1.9       0.1X

 OpenJDK 64-Bit Server VM 11.0.8+10-LTS on Mac OS X 10.15.5
 Intel(R) Core(TM) i9-9880H CPU  2.30GHz
 LONG Encode (Lower Skew):                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-PassThrough(1.000)                                   18             19           1       3803.0           0.3       1.0X
-RunLengthEncoding(0.754)                           1526           1540          20         44.0          22.7       0.0X
-DictionaryEncoding(0.250)                           735            759          33         91.3          11.0       0.0X
-LongDelta(0.125)                                    126            129           2        530.8           1.9       0.1X
+PassThrough(1.000)                                   19             21           1       3543.5           0.3       1.0X
+RunLengthEncoding(0.747)                           1049           1058          12         63.9          15.6       0.0X
+DictionaryEncoding(0.250)                           620            634          17        108.2           9.2       0.0X
+LongDelta(0.125)                                    129            132           2        520.1           1.9       0.1X

 OpenJDK 64-Bit Server VM 11.0.8+10-LTS on Mac OS X 10.15.5
 Intel(R) Core(TM) i9-9880H CPU  2.30GHz
 LONG Encode (Higher Skew):                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-PassThrough(1.000)                                   18             20           1       3705.4           0.3       1.0X
-RunLengthEncoding(1.002)                           1665           1669           6         40.3          24.8       0.0X
-DictionaryEncoding(0.251)                           890            901          11         75.4          13.3       0.0X
-LongDelta(0.125)                                    125            130           3        537.2           1.9       0.1X
+PassThrough(1.000)                                   18             20           2       3726.8           0.3       1.0X
+RunLengthEncoding(0.999)                           1076           1077           2         62.4          16.0       0.0X
+DictionaryEncoding(0.251)                           904            919          19         74.3          13.5       0.0X
+LongDelta(0.125)                                    125            131           4        536.5           1.9       0.1X

 OpenJDK 64-Bit Server VM 11.0.8+10-LTS on Mac OS X 10.15.5
 Intel(R) Core(TM) i9-9880H CPU  2.30GHz
 STRING Encode:                            Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-PassThrough(1.000)                                   27             30           2       2497.1           0.4       1.0X
-RunLengthEncoding(0.892)                           3443           3587         204         19.5          51.3       0.0X
-DictionaryEncoding(0.167)                          2286           2290           6         29.4          34.1       0.0X
+PassThrough(1.000)                                   28             31           2       2430.2           0.4       1.0X
+RunLengthEncoding(0.889)                           1798           1800           3         37.3          26.8       0.0X
+DictionaryEncoding(0.167)                          1956           1959           4         34.3          29.1       0.0X
```

In the above diff, new results are with changes in this PR. It can be seen that encoding performance has improved quite a lot especially for string type.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Relies on existing unit tests.

Closes #29654 from sunchao/SPARK-32802.

Authored-by: Chao Sun <sunchao@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-09-12 22:19:30 -07:00
Chao Sun 3d08084022 [SPARK-24994][SQL] Add UnwrapCastInBinaryComparison optimizer to simplify literal types
### What changes were proposed in this pull request?

Currently, in cases like the following:
```sql
SELECT * FROM t WHERE age < 40
```
where `age` is of short type, Spark won't be able to simplify this and can only generate filter `cast(age, int) < 40`. This won't get pushed down to datasources and therefore is not optimized.

This PR proposes a optimizer rule to improve this when the following constraints are satisfied:
 - input expression is binary comparisons when one side is a cast operation and another is a literal.
 - both the cast child expression and literal are of integral type (i.e., byte, short, int or long)

When this is true, it tries to do several optimizations to either simplify the expression or move the cast to the literal side, so
result filter for the above case becomes `age < cast(40 as smallint)`. This is better since the cast can be optimized away later and the filter can be pushed down to data sources.

This PR follows a similar effort in Presto (https://prestosql.io/blog/2019/05/21/optimizing-the-casts-away.html). Here we only handles integral types but plan to extend to other types as follow-ups.

### Why are the changes needed?

As mentioned in the previous section, when cast is not optimized, it cannot be pushed down to data sources which can lead
to unnecessary IO and therefore longer job time and waste of resources. This helps to improve that.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit tests for both the optimizer rule and filter pushdown on datasource level for both Orc and Parquet.

Closes #29565 from sunchao/SPARK-24994.

Authored-by: Chao Sun <sunchao@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-09-12 21:34:35 -07:00
Takeshi Yamamuro 4269c2c252 [SPARK-32851][SQL][TEST] Tests should fail if errors happen when generating projection code
### What changes were proposed in this pull request?

This PR intends to set `CODEGEN_ONLY` at `CODEGEN_FACTORY_MODE` in test spark context so that tests can fail if errors happen when generating expr code.

### Why are the changes needed?

I noticed that the code generation of `SafeProjection` failed in the existing test (https://issues.apache.org/jira/browse/SPARK-32828) but it passed because `FALLBACK` was set at `CODEGEN_FACTORY_MODE` (by default) in `SharedSparkSession`. To get aware of these failures quickly, I think its worth setting `CODEGEN_ONLY` at `CODEGEN_FACTORY_MODE`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #29721 from maropu/ExprCodegenTest.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-09-12 08:42:07 +09:00
Dongjoon Hyun b4be6a6d12 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite
### What changes were proposed in this pull request?

This PR aims to add `sinkParameter`  to check sink options robustly and independently in DataStreamReaderWriterSuite

### Why are the changes needed?

`LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`.

To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness.

```scala
val query = df.writeStream.
   ...
   .start()
assert(LastOptions.paramters(..))
query.stop()
```

### Does this PR introduce _any_ user-facing change?

No. This is a test-only change.

### How was this patch tested?

Pass the newly updated test case.

Closes #29730 from dongjoon-hyun/SPARK-32845.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-09-11 11:48:34 -07:00
Peter Toth 94cac5978c [SPARK-32730][SQL][FOLLOW-UP] Improve LeftAnti SortMergeJoin right side buffering
### What changes were proposed in this pull request?

This is a follow-up to https://github.com/apache/spark/pull/29572.

LeftAnti SortMergeJoin should not buffer all matching right side rows when bound condition is empty, this is unnecessary and can lead to performance degradation especially when spilling happens.

### Why are the changes needed?

Performance improvement.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UT.

Closes #29727 from peter-toth/SPARK-32730-improve-leftsemi-sortmergejoin-followup.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-11 13:42:33 +00:00
Wenchen Fan 9f4f49cbaa [SPARK-32853][SQL] Consecutive save/load calls in DataFrame/StreamReader/Writer should not fail
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/29328

In https://github.com/apache/spark/pull/29328 , we forbid the use case that path option and path parameter are both specified.  However,  it breaks some use cases:
```
val dfr =  spark.read.format(...).option(...)
dfr.load(path1).xxx
dfr.load(path2).xxx
```

The reason is that: `load` has side effects. It will set path option to the `DataFrameReader` instance. The next time you call `load`, Spark will fail because both path option and path parameter are specified.

This PR removes the side effect of `save`/`load`/`start`  to not set the path option.

### Why are the changes needed?

recover some use cases

### Does this PR introduce _any_ user-facing change?

Yes, some use cases fail before this PR, and can run successfully after this PR.

### How was this patch tested?

new tests

Closes #29723 from cloud-fan/df.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-09-11 06:15:58 -07:00
Wenchen Fan 328d81a2d1 [SPARK-32677][SQL][DOCS][MINOR] Improve code comment in CreateFunctionCommand
### What changes were proposed in this pull request?

We made a mistake in https://github.com/apache/spark/pull/29502, as there is no code comment to explain why we can't load the UDF class when creating functions. This PR improves the code comment.

### Why are the changes needed?

To avoid making the same mistake.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

N/A

Closes #29713 from cloud-fan/comment.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-09-11 09:22:56 +09:00
Kousuke Saruta 5f468cc21e [SPARK-32822][SQL] Change the number of partitions to zero when a range is empty with WholeStageCodegen disabled or falled back
### What changes were proposed in this pull request?

This PR changes the behavior of RangeExec with WholeStageCodegen disabled or falled back to change the number of partitions to zero when a range is empty.

In the current master, if WholeStageCodegen effects, the number of partitions of an empty range will be changed to zero.
```
spark.range(1, 1, 1, 1000).rdd.getNumPartitions
res0: Int = 0
```
But it doesn't if WholeStageCodegen is disabled or falled back.
```
spark.conf.set("spark.sql.codegen.wholeStage", false)
spark.range(1, 1, 1, 1000).rdd.getNumPartitions
res2: Int = 1000
```

### Why are the changes needed?

To archive better performance even though WholeStageCodegen disabled or falled back.

### Does this PR introduce _any_ user-facing change?

Yes. the number of partitions gotten with `getNumPartitions` for an empty range will be changed when WholeStageCodegen is disabled.

### How was this patch tested?

New test.

Closes #29681 from sarutak/zero-size-range.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-09-11 09:11:35 +09:00
gengjiaan a22871f50a [SPARK-32777][SQL] Aggregation support aggregate function with multiple foldable expressions
### What changes were proposed in this pull request?
Spark SQL exists a bug show below:

```
spark.sql(
  " SELECT COUNT(DISTINCT 2), COUNT(DISTINCT 2, 3)")
  .show()
+-----------------+--------------------+
|count(DISTINCT 2)|count(DISTINCT 2, 3)|
+-----------------+--------------------+
|                1|                   1|
+-----------------+--------------------+

spark.sql(
  " SELECT COUNT(DISTINCT 2), COUNT(DISTINCT 3, 2)")
  .show()
+-----------------+--------------------+
|count(DISTINCT 2)|count(DISTINCT 3, 2)|
+-----------------+--------------------+
|                1|                   0|
+-----------------+--------------------+
```
The first query is correct, but the second query is not.
The root reason is the second query rewrited by `RewriteDistinctAggregates` who expand the output but lost the 2.

### Why are the changes needed?
Fix a bug.
`SELECT COUNT(DISTINCT 2), COUNT(DISTINCT 3, 2)` should return `1, 1`

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
New UT

Closes #29626 from beliefer/support-multiple-foldable-distinct-expressions.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: beliefer <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-10 11:25:32 +00:00
Kent Yao 5669b212ec [SPARK-32840][SQL] Invalid interval value can happen to be just adhesive with the unit
### What changes were proposed in this pull request?
In this PR, we add a checker for STRING form interval value ahead for parsing multiple units intervals and fail directly if the interval value contains alphabets to prevent correctness issues like `interval '1 day 2' day`=`3 days`.

### Why are the changes needed?

fix correctness issue

### Does this PR introduce _any_ user-facing change?

yes, in spark 3.0.0 `interval '1 day 2' day`=`3 days` but now we fail with ParseException
### How was this patch tested?

add a test.

Closes #29708 from yaooqinn/SPARK-32840.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-10 11:20:05 +00:00
Takeshi Yamamuro 7eb76d6988 [SPARK-32828][SQL] Cast from a derived user-defined type to a base type
### What changes were proposed in this pull request?

This PR intends to fix an existing bug below in `UserDefinedTypeSuite`;
```
[info] - SPARK-19311: UDFs disregard UDT type hierarchy (931 milliseconds)
16:22:35.936 WARN org.apache.spark.sql.catalyst.expressions.SafeProjection: Expr codegen error and falling back to interpreter mode
org.apache.spark.SparkException: Cannot cast org.apache.spark.sql.ExampleSubTypeUDT46b1771f to org.apache.spark.sql.ExampleBaseTypeUDT31e8d979.
	at org.apache.spark.sql.catalyst.expressions.CastBase.nullSafeCastFunction(Cast.scala:891)
	at org.apache.spark.sql.catalyst.expressions.CastBase.doGenCode(Cast.scala:852)
	at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:147)
    ...
```

### Why are the changes needed?

bugfix

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit tests.

Closes #29691 from maropu/FixUdtBug.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-09-10 19:19:26 +09:00
Jungtaek Lim (HeartSaVioR) 8f61005723 [SPARK-32456][SS][FOLLOWUP] Update doc to note about using SQL statement with streaming Dataset
### What changes were proposed in this pull request?

This patch proposes to update the doc (both SS guide doc and Dataset dropDuplicates method doc) to leave a note to check on using SQL statements with streaming Dataset.

Once end users create a temp view based on streaming Dataset, they won't bother with thinking about "streaming" and do whatever they do with batch query. In many cases it works, but not just smoothly for the case when streaming aggregation is involved. They still need to concern about maintaining state store.

### Why are the changes needed?

Although SPARK-32456 fixed the weird error message, as a side effect some operations are enabled on streaming workload via SQL statement, which is error-prone if end users don't indicate what they're doing.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Only doc change.

Closes #29461 from HeartSaVioR/SPARK-32456-FOLLOWUP-DOC.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-10 08:10:32 +00:00
Dongjoon Hyun 2f85f9516c [SPARK-32832][SS] Use CaseInsensitiveMap for DataStreamReader/Writer options
### What changes were proposed in this pull request?

This PR aims to fix indeterministic behavior on DataStreamReader/Writer options like the following.
```scala
scala> spark.readStream.format("parquet").option("paTh", "1").option("PATH", "2").option("Path", "3").option("patH", "4").option("path", "5").load()
org.apache.spark.sql.AnalysisException: Path does not exist: 1;
```

### Why are the changes needed?

This will make the behavior deterministic.

### Does this PR introduce _any_ user-facing change?

Yes, but the previous behavior is indeterministic.

### How was this patch tested?

Pass the newly test cases.

Closes #29702 from dongjoon-hyun/SPARK-32832.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-09-09 23:41:32 -07:00
Jungtaek Lim (HeartSaVioR) db89b0e1b8 [SPARK-32831][SS] Refactor SupportsStreamingUpdate to represent actual meaning of the behavior
### What changes were proposed in this pull request?

This PR renames `SupportsStreamingUpdate` to `SupportsStreamingUpdateAsAppend` as the new interface name represents the actual behavior clearer. This PR also removes the `update()` method (so the interface is more likely a marker), as the implementations of `SupportsStreamingUpdateAsAppend` should support append mode by default, hence no need to trigger some flag on it.

### Why are the changes needed?

SupportsStreamingUpdate was intended to revive the functionality of Streaming update output mode for internal data sources, but despite the name, that interface isn't really used to do actual update on sink; all sinks are implementing this interface to do append, so strictly saying, it's just to support update as append. Renaming the interface would make it clear.

### Does this PR introduce _any_ user-facing change?

No, as the class is only for internal data sources.

### How was this patch tested?

Jenkins test will follow.

Closes #29693 from HeartSaVioR/SPARK-32831.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
2020-09-10 15:33:18 +09:00
Liang-Chi Hsieh add267c4de [SPARK-32819][SQL] ignoreNullability parameter should be effective recursively
### What changes were proposed in this pull request?

This patch proposes to check `ignoreNullability` parameter recursively in `equalsStructurally` method.

### Why are the changes needed?

`equalsStructurally` is used to check type equality. We can optionally ask to ignore nullability check. But the parameter `ignoreNullability` is not passed recursively down to nested types. So it produces weird error like:

```
data type mismatch: argument 3 requires array<array<string>> type, however ... is of array<array<string>> type.
```

when running the query `select aggregate(split('abcdefgh',''), array(array('')), (acc, x) -> array(array( x ) ) )`.

### Does this PR introduce _any_ user-facing change?

Yes, fixed a bug when running user query.

### How was this patch tested?

Unit tests.

Closes #29698 from viirya/SPARK-32819.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-10 02:53:22 +00:00
Dongjoon Hyun 06a994517f [SPARK-32836][SS][TESTS] Fix DataStreamReaderWriterSuite to check writer options correctly
### What changes were proposed in this pull request?

This PR aims to fix the test coverage at `DataStreamReaderWriterSuite`.

### Why are the changes needed?

Currently, the test case checks `DataStreamReader` options instead of `DataStreamWriter` options.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the revised test case.

Closes #29701 from dongjoon-hyun/SPARK-32836.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-09-09 19:46:55 -07:00
Terry Kim ab2fa881ed [SPARK-32516][SQL][FOLLOWUP] Remove unnecessary check if path string is empty for DataFrameWriter.save(), DataStreamReader.load() and DataStreamWriter.start()
### What changes were proposed in this pull request?

This PR is a follow up to https://github.com/apache/spark/pull/29543#discussion_r485409606, which correctly points out that the check for the empty string is not necessary.

### Why are the changes needed?

The unnecessary check actually could cause more confusion.

For example,
```scala
scala> Seq(1).toDF.write.option("path", "/tmp/path1").parquet("")
java.lang.IllegalArgumentException: Can not create a Path from an empty string
  at org.apache.hadoop.fs.Path.checkPathArg(Path.java:168)
```
even when `path` option is available. This PR addresses to fix this confusion.

### Does this PR introduce _any_ user-facing change?

Yes, now the above example prints the consistent exception message whether the path parameter value is empty or not.
```scala
scala> Seq(1).toDF.write.option("path", "/tmp/path1").parquet("")
org.apache.spark.sql.AnalysisException: There is a 'path' option set and save() is called with a path parameter. Either remove the path option, or call save() without the parameter. To ignore this check, set 'spark.sql.legacy.pathOptionBehavior.enabled' to 'true'.;
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:290)
  at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:856)
  ... 47 elided
```

### How was this patch tested?

Added unit tests.

Closes #29697 from imback82/SPARK-32516-followup.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-10 01:48:51 +00:00
Wenchen Fan f7995c576a Revert "[SPARK-32677][SQL] Load function resource before create"
This reverts commit 05fcf26b79.
2020-09-09 18:15:22 +00:00
Tathagata Das e4237bbda6 [SPARK-32794][SS] Fixed rare corner case error in micro-batch engine with some stateful queries + no-data-batches + V1 sources
### What changes were proposed in this pull request?
Make MicroBatchExecution explicitly call `getBatch` when the start and end offsets are the same.

### Why are the changes needed?

Structured Streaming micro-batch engine has the contract with V1 data sources that, after a restart, it will call `source.getBatch()` on the last batch attempted before the restart. However, a very rare combination of sequences violates this contract. It occurs only when
- The streaming query has specific types of stateful operations with watermarks (e.g., aggregation in append, mapGroupsWithState with timeouts).
    - These queries can execute a batch even without new data when the previous updates the watermark and the stateful ops are such that the new watermark can cause new output/cleanup. Such batches are called no-data-batches.
- The last batch before termination was an incomplete no-data-batch. Upon restart, the micro-batch engine fails to call `source.getBatch` when attempting to re-execute the incomplete no-data-batch.

This occurs because no-data-batches has the same and end offsets, and when a batch is executed, if the start and end offset is same then calling `source.getBatch` is skipped as it is assumed the generated plan will be empty. This only affects V1 data sources like Delta and Autoloader which rely on this invariant to detect in the source whether the query is being started from scratch or restarted.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

New unit test with a mock v1 source that fails without the fix.

Closes #29651 from tdas/SPARK-32794.

Authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
2020-09-09 13:35:51 -04:00
yangjie01 513d51a2c5 [SPARK-32808][SQL] Fix some test cases of sql/core module in scala 2.13
### What changes were proposed in this pull request?
The purpose of this pr is to partial resolve [SPARK-32808](https://issues.apache.org/jira/browse/SPARK-32808), total of 26 failed test cases were fixed, the related suite as follow:

- `StreamingAggregationSuite` related test cases (2 FAILED -> Pass)

- `GeneratorFunctionSuite` related test cases (2 FAILED -> Pass)

- `UDFSuite` related test cases (2 FAILED -> Pass)

- `SQLQueryTestSuite` related test cases (5 FAILED -> Pass)

- `WholeStageCodegenSuite` related test cases (1 FAILED -> Pass)

- `DataFrameSuite` related test cases (3 FAILED -> Pass)

- `OrcV1QuerySuite\OrcV2QuerySuite` related test cases (4 FAILED -> Pass)

- `ExpressionsSchemaSuite` related test cases (1 FAILED -> Pass)

- `DataFrameStatSuite` related test cases (1 FAILED -> Pass)

- `JsonV1Suite\JsonV2Suite\JsonLegacyTimeParserSuite` related test cases (6 FAILED -> Pass)

The main change of this pr as following:

- Fix Scala 2.13 compilation problems in   `ShuffleBlockFetcherIterator`  and `Analyzer`

- Specified `Seq` to `scala.collection.Seq` in `objects.scala` and `GenericArrayData` because internal use `Seq` maybe `mutable.ArraySeq` and not easy to call `.toSeq`

- Should specified `Seq` to `scala.collection.Seq`  when we call `Row.getAs[Seq]` and `Row.get(i).asInstanceOf[Seq]` because the data maybe `mutable.ArraySeq` but `Seq` is `immutable.Seq` in Scala 2.13

- Use a compatible way to let `+` and `-` method  of `Decimal` having the same behavior in Scala 2.12 and Scala 2.13

- Call `toList` in `RelationalGroupedDataset.toDF` method when `groupingExprs` is `Stream` type because `Stream` can't serialize in Scala 2.13

- Add a manual sort to `classFunsMap` in `ExpressionsSchemaSuite` because `Iterable.groupBy` in Scala 2.13 has different result with `TraversableLike.groupBy`  in Scala 2.12

### Why are the changes needed?
We need to support a Scala 2.13 build.

### Does this PR introduce _any_ user-facing change?

Should specified `Seq` to `scala.collection.Seq`  when we call `Row.getAs[Seq]` and `Row.get(i).asInstanceOf[Seq]` because the data maybe `mutable.ArraySeq` but the `Seq` is `immutable.Seq` in Scala 2.13

### How was this patch tested?

- Scala 2.12: Pass the Jenkins or GitHub Action

- Scala 2.13: Do the following:

```
dev/change-scala-version.sh 2.13
mvn clean install -DskipTests  -pl sql/core -Pscala-2.13 -am
mvn test -pl sql/core -Pscala-2.13
```

**Before**
```
Tests: succeeded 8166, failed 319, canceled 1, ignored 52, pending 0
*** 319 TESTS FAILED ***

```

**After**

```
Tests: succeeded 8204, failed 286, canceled 1, ignored 52, pending 0
*** 286 TESTS FAILED ***

```

Closes #29660 from LuciferYang/SPARK-32808.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-09-09 08:53:44 -05:00
Liang-Chi Hsieh de0dc52a84 [SPARK-32813][SQL] Get default config of ParquetSource vectorized reader if no active SparkSession
### What changes were proposed in this pull request?

If no active SparkSession is available, let `FileSourceScanExec.needsUnsafeRowConversion` look at default SQL config of ParquetSource vectorized reader instead of failing the query execution.

### Why are the changes needed?

Fix a bug that if no active SparkSession is available, file-based data source scan for Parquet Source will throw exception.

### Does this PR introduce _any_ user-facing change?

Yes, this change fixes the bug.

### How was this patch tested?

Unit test.

Closes #29667 from viirya/SPARK-32813.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-09 12:23:05 +09:00
Max Gekk adc8d687ce [SPARK-32810][SQL][TESTS][FOLLOWUP] Check path globbing in JSON/CSV datasources v1 and v2
### What changes were proposed in this pull request?
In the PR, I propose to move the test `SPARK-32810: CSV and JSON data sources should be able to read files with escaped glob metacharacter in the paths` from `DataFrameReaderWriterSuite` to `CSVSuite` and to `JsonSuite`. This will allow to run the same test in `CSVv1Suite`/`CSVv2Suite` and in `JsonV1Suite`/`JsonV2Suite`.

### Why are the changes needed?
To improve test coverage by checking JSON/CSV datasources v1 and v2.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running affected test suites:
```
$ build/sbt "sql/test:testOnly org.apache.spark.sql.execution.datasources.csv.*"
$ build/sbt "sql/test:testOnly org.apache.spark.sql.execution.datasources.json.*"
```

Closes #29684 from MaxGekk/globbing-paths-when-inferring-schema-dsv2.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-09 10:29:58 +09:00
manuzhang 96ff87dce8 [SPARK-32753][SQL][FOLLOWUP] Fix indentation and clean up view in test
### What changes were proposed in this pull request?
Fix indentation and clean up view in the test added by https://github.com/apache/spark/pull/29593.

### Why are the changes needed?
Address review comments in https://github.com/apache/spark/pull/29665.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Updated test.

Closes #29682 from manuzhang/spark-32753-followup.

Authored-by: manuzhang <owenzhang1990@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-09 10:20:21 +09:00
Zhenhua Wang e7d9a24565 [SPARK-32817][SQL] DPP throws error when broadcast side is empty
### What changes were proposed in this pull request?

In `SubqueryBroadcastExec.relationFuture`, if the `broadcastRelation` is an `EmptyHashedRelation`, then `broadcastRelation.keys()` will throw `UnsupportedOperationException`.

### Why are the changes needed?

To fix a bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added a new test.

Closes #29671 from wzhfy/dpp_empty_broadcast.

Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-09-08 21:36:21 +09:00
sychen bd3dc2f54d [SPARK-31511][FOLLOW-UP][TEST][SQL] Make BytesToBytesMap iterators thread-safe
### What changes were proposed in this pull request?
Before SPARK-31511 is fixed, `BytesToBytesMap` iterator() is not thread-safe and may cause data inaccuracy.
We need to add a unit test.

### Why are the changes needed?
Increase test coverage to ensure that iterator() is thread-safe.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
add ut

Closes #29669 from cxzl25/SPARK-31511-test.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-08 11:54:04 +00:00
Zhenhua Wang 55d38a479b [SPARK-32748][SQL] Revert "Support local property propagation in SubqueryBroadcastExec"
### What changes were proposed in this pull request?

This reverts commit 04f7f6dac0 due to the discussion in [comment](https://github.com/apache/spark/pull/29589#discussion_r484657207).

### Why are the changes needed?

Based on  the discussion in [comment](https://github.com/apache/spark/pull/29589#discussion_r484657207), propagation for thread local properties in `SubqueryBroadcastExec` is not necessary, since they will be propagated by broadcast exchange threads anyway.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Also revert the added test.

Closes #29674 from wzhfy/revert_dpp_thread_local.

Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-09-08 20:20:16 +09:00
Wenchen Fan 4144b6da52 [SPARK-32764][SQL] -0.0 should be equal to 0.0
### What changes were proposed in this pull request?

This is a Spark 3.0 regression introduced by https://github.com/apache/spark/pull/26761. We missed a corner case that `java.lang.Double.compare` treats 0.0 and -0.0 as different, which breaks SQL semantic.

This PR adds back the `OrderingUtil`, to provide custom compare methods that take care of 0.0 vs -0.0

### Why are the changes needed?

Fix a correctness bug.

### Does this PR introduce _any_ user-facing change?

Yes, now `SELECT  0.0 > -0.0` returns false correctly as Spark 2.x.

### How was this patch tested?

new tests

Closes #29647 from cloud-fan/float.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-09-07 20:43:43 -07:00
Max Gekk 954cd9feaa [SPARK-32810][SQL] CSV/JSON data sources should avoid globbing paths when inferring schema
### What changes were proposed in this pull request?
In the PR, I propose to fix an issue with the CSV and JSON data sources in Spark SQL when both of the following are true:
* no user specified schema
* some file paths contain escaped glob metacharacters, such as `[``]`, `{``}`, `*` etc.

### Why are the changes needed?
To fix the issue when the follow two queries try to read from paths `[abc].csv` and `[abc].json`:
```scala
spark.read.csv("""/tmp/\[abc\].csv""").show
spark.read.json("""/tmp/\[abc\].json""").show
```
but would end up hitting an exception:
```
org.apache.spark.sql.AnalysisException: Path does not exist: file:/tmp/[abc].csv;
  at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:722)
  at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244)
  at scala.collection.immutable.List.foreach(List.scala:392)
```

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
Added new test cases in `DataFrameReaderWriterSuite`.

Closes #29659 from MaxGekk/globbing-paths-when-inferring-schema.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-08 09:42:59 +09:00
manuzhang c43460cf82 [SPARK-32753][SQL] Only copy tags to node with no tags
### What changes were proposed in this pull request?
Only copy tags to node with no tags when transforming plans.

### Why are the changes needed?
cloud-fan [made a good point](https://github.com/apache/spark/pull/29593#discussion_r482013121) that it doesn't make sense to append tags to existing nodes when nodes are removed. That will cause such bugs as duplicate rows when deduplicating and repartitioning by the same column with AQE.

```
spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
val df = spark.sql("select id from v1 group by id distribute by id")
println(df.collect().toArray.mkString(","))
println(df.queryExecution.executedPlan)

// With AQE
[4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
AdaptiveSparkPlan(isFinalPlan=true)
+- CustomShuffleReader local
   +- ShuffleQueryStage 0
      +- Exchange hashpartitioning(id#183L, 10), true
         +- *(3) HashAggregate(keys=[id#183L], functions=[], output=[id#183L])
            +- Union
               :- *(1) Range (0, 10, step=1, splits=2)
               +- *(2) Range (0, 10, step=1, splits=2)

// Without AQE
[4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
*(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
+- Exchange hashpartitioning(id#206L, 10), true
   +- *(3) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
      +- Union
         :- *(1) Range (0, 10, step=1, splits=2)
         +- *(2) Range (0, 10, step=1, splits=2)
```

It's too expensive to detect node removal so we make a compromise only to copy tags to node with no tags.

### Does this PR introduce _any_ user-facing change?
Yes. Fix a bug.

### How was this patch tested?
Add test.

Closes #29593 from manuzhang/spark-32753.

Authored-by: manuzhang <owenzhang1990@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-07 16:08:57 +00:00
Zhenhua Wang 04f7f6dac0 [SPARK-32748][SQL] Support local property propagation in SubqueryBroadcastExec
### What changes were proposed in this pull request?

Since [SPARK-22590](2854091d12), local property propagation is supported through `SQLExecution.withThreadLocalCaptured` in both `BroadcastExchangeExec` and `SubqueryExec` when computing `relationFuture`. This pr adds the support in `SubqueryBroadcastExec`.

### Why are the changes needed?

Local property propagation is missed in `SubqueryBroadcastExec`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Add a new test.

Closes #29589 from wzhfy/thread_local.

Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-07 06:26:14 +00:00
ulysses 05fcf26b79 [SPARK-32677][SQL] Load function resource before create
### What changes were proposed in this pull request?

Change `CreateFunctionCommand` code that add class check before create function.

### Why are the changes needed?

We have different behavior between create permanent function and temporary function when function class is invaild. e.g.,
```
create function f as 'test.non.exists.udf';
-- Time taken: 0.104 seconds

create temporary function f as 'test.non.exists.udf'
-- Error in query: Can not load class 'test.non.exists.udf' when registering the function 'f', please make sure it is on the classpath;
```

And Hive also fails both of them.

### Does this PR introduce _any_ user-facing change?

Yes, user will get exception when create a invalid udf.

### How was this patch tested?

New test.

Closes #29502 from ulysses-you/function.

Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-07 06:00:23 +00:00
Kent Yao de44e9cfa0 [SPARK-32785][SQL] Interval with dangling parts should not results null
### What changes were proposed in this pull request?

bugfix for incomplete interval values, e.g. interval '1', interval '1 day 2', currently these cases will result null, but actually we should fail them with IllegalArgumentsException

### Why are the changes needed?

correctness

### Does this PR introduce _any_ user-facing change?

yes, incomplete intervals will throw exception now

#### before
```
bin/spark-sql -S -e "select interval '1', interval '+', interval '1 day -'"

NULL NULL NULL
```
#### after

```
-- !query
select interval '1'
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.parser.ParseException

Cannot parse the INTERVAL value: 1(line 1, pos 7)

== SQL ==
select interval '1'
```

### How was this patch tested?

unit tests added

Closes #29635 from yaooqinn/SPARK-32785.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-07 05:11:30 +00:00
Eren Avsarogullari f5360e761e [SPARK-32548][SQL] - Add Application attemptId support to SQL Rest API
### What changes were proposed in this pull request?
Currently, Spark Public Rest APIs support Application attemptId except SQL API. This causes `no such app: application_X` issue when the application has `attemptId` (e.g: YARN cluster mode).

Please find existing and supported Rest endpoints with attemptId.
```
// Existing Rest Endpoints
applications/{appId}/sql
applications/{appId}/sql/{executionId}

// Rest Endpoints required support
applications/{appId}/{attemptId}/sql
applications/{appId}/{attemptId}/sql/{executionId}
```
Also fixing following compile warning on `SqlResourceSuite`:
```
[WARNING] [Warn] ~/spark/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala:67: Reference to uninitialized value edges
```
### Why are the changes needed?
This causes `no such app: application_X` issue when the application has `attemptId`.

### Does this PR introduce _any_ user-facing change?
Not yet because SQL Rest API is being planned to release with `Spark 3.1`.

### How was this patch tested?
1. New Unit tests are added for existing Rest endpoints. `attemptId` seems not coming in `local-mode` and coming in `YARN cluster mode` so could not be added for `attemptId` case (Suggestions are welcome).
2. Also, patch has been tested manually through both Spark Core and History Server Rest APIs.

Closes #29364 from erenavsarogullari/SPARK-32548.

Authored-by: Eren Avsarogullari <erenavsarogullari@gmail.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
2020-09-06 19:23:12 +08:00
Yuming Wang 0b3bb45b89 [SPARK-32791][SQL] Non-partitioned table metric should not have dynamic partition pruning time
### What changes were proposed in this pull request?

This pr make non-partitioned table metric should not have dynamic partition pruning time.

### Why are the changes needed?

It is useless for non-partitioned table.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual test

Before this pr:
![image](https://user-images.githubusercontent.com/5399861/92141803-87fed380-ee45-11ea-9784-09625b246fea.png)
After this pr:
![image](https://user-images.githubusercontent.com/5399861/92141774-7c131180-ee45-11ea-8a9e-6775c592f496.png)

Closes #29641 from wangyum/SPARK-32791.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2020-09-05 23:49:17 +08:00
yangjie 1de272f98d [SPARK-32762][SQL][TEST] Enhance the verification of ExpressionsSchemaSuite to sql-expression-schema.md
### What changes were proposed in this pull request?
`sql-expression-schema.md` automatically generated by `ExpressionsSchemaSuite`, but only expressions entries are checked in `ExpressionsSchemaSuite`. So if we manually modify the contents of the file,  `ExpressionsSchemaSuite` does not necessarily guarantee the correctness of the it some times. For example, [Spark-24884](https://github.com/apache/spark/pull/27507) added `regexp_extract_all`  expression support, and manually modify the `sql-expression-schema.md` but not change the content of `Number of queries` cause file content inconsistency.

Some additional checks have been added to `ExpressionsSchemaSuite` to improve the correctness guarantee of `sql-expression-schema.md` as follow:

- `Number of queries` should equals size of `expressions entries` in `sql-expression-schema.md`

- `Number of expressions that missing example` should equals size of `Expressions missing examples` in `sql-expression-schema.md`

- `MissExamples` from case should same as  `expectedMissingExamples` from `sql-expression-schema.md`

### Why are the changes needed?
Ensure the correctness of `sql-expression-schema.md` content.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Enhanced ExpressionsSchemaSuite

Closes #29608 from LuciferYang/sql-expression-schema.

Authored-by: yangjie <yangjie@MacintoshdeMacBook-Pro.local>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-09-04 09:40:35 +09:00
Zhenhua Wang e693df2a07 [SPARK-32786][SQL][TEST] Improve performance for some slow DPP tests
### What changes were proposed in this pull request?

The whole `DynamicPartitionPruningSuite` takes about 2 min on my laptop (either AE on or off). The slowest tests are `test("simple inner join triggers DPP with mock-up tables")` and `test("cleanup any DPP filter that isn't pushed down due to expression id clashes")`, which totally take about 1 min.

We can reuse existing test tables or use smaller tables to reduce the cost. After that, the two tests takes only about 1 sec in total, leading to 2x speedup for the suite.

### Why are the changes needed?

To speedup DPP test suites.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Modified two existing tests.

Closes #29636 from wzhfy/improve_dpp_test.

Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-09-04 09:33:20 +09:00
Wenchen Fan 76330e0295 [SPARK-32788][SQL] non-partitioned table scan should not have partition filter
### What changes were proposed in this pull request?

This PR fixes a bug `FileSourceStrategy`, which generates partition filters even if the table is not partitioned. This can confuse `FileSourceScanExec`, which mistakenly think the table is partitioned and tries to update the `numPartitions` metrics, and cause a failure. We should not generate partition filters for non-partitioned table.

### Why are the changes needed?

The bug was exposed by https://github.com/apache/spark/pull/29436.

### Does this PR introduce _any_ user-facing change?

Yes, fix a bug.

### How was this patch tested?

new test

Closes #29637 from cloud-fan/refactor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2020-09-03 23:49:17 +08:00
Takeshi Yamamuro a6114d8fb8 [SPARK-32638][SQL] Corrects references when adding aliases in WidenSetOperationTypes
### What changes were proposed in this pull request?

This PR intends to fix a bug where references can be missing when adding aliases to widen data types in `WidenSetOperationTypes`. For example,
```
CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v);
SELECT t.v FROM (
  SELECT v FROM t3
  UNION ALL
  SELECT v + v AS v FROM t3
) t;

org.apache.spark.sql.AnalysisException: Resolved attribute(s) v#1 missing from v#3 in operator !Project [v#1]. Attribute(s) with the same name appear in the operation: v. Please check if the right attribute(s) are used.;;
!Project [v#1]  <------ the reference got missing
+- SubqueryAlias t
   +- Union
      :- Project [cast(v#1 as decimal(11,0)) AS v#3]
      :  +- Project [v#1]
      :     +- SubqueryAlias t3
      :        +- SubqueryAlias tbl
      :           +- LocalRelation [v#1]
      +- Project [v#2]
         +- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2]
            +- SubqueryAlias t3
               +- SubqueryAlias tbl
                  +- LocalRelation [v#1]
```
In the case, `WidenSetOperationTypes` added the alias `cast(v#1 as decimal(11,0)) AS v#3`, then the reference in the top `Project` got missing. This PR correct the reference (`exprId` and widen `dataType`) after adding aliases in the rule.

### Why are the changes needed?

bugfixes

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit tests

Closes #29485 from maropu/SPARK-32638.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-03 14:48:26 +00:00
Peter Toth ffd5227543 [SPARK-32730][SQL] Improve LeftSemi and Existence SortMergeJoin right side buffering
### What changes were proposed in this pull request?

LeftSemi and Existence SortMergeJoin should not buffer all matching right side rows when bound condition is empty, this is unnecessary and can lead to performance degradation especially when spilling happens.

### Why are the changes needed?

Performance improvement.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UT and TPCDS benchmarks.

Closes #29572 from peter-toth/SPARK-32730-improve-leftsemi-sortmergejoin.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-03 14:17:34 +00:00
Ali Afroozeh 0a6043f683 [SPARK-32755][SQL] Maintain the order of expressions in AttributeSet and ExpressionSet
### What changes were proposed in this pull request?
This PR changes `AttributeSet` and `ExpressionSet` to maintain the insertion order of the elements. More specifically, we:
- change the underlying data structure of `AttributeSet` from `HashSet` to `LinkedHashSet` to maintain the insertion order.
- `ExpressionSet` already uses a list to keep track of the expressions, however, since it is extending Scala's immutable.Set class, operations such as map and flatMap are delegated to the immutable.Set itself. This means that the result of these operations is not an instance of ExpressionSet anymore, rather it's a implementation picked up by the parent class. We also remove this inheritance from `immutable.Set `and implement the needed methods directly. ExpressionSet has a very specific semantics and it does not make sense to extend `immutable.Set` anyway.
- change the `PlanStabilitySuite` to not sort the attributes, to be able to catch changes in the order of expressions in different runs.

### Why are the changes needed?
Expressions identity is based on the `ExprId` which is an auto-incremented number. This means that the same query can yield a query plan with different expression ids in different runs. `AttributeSet` and `ExpressionSet` internally use a `HashSet` as the underlying data structure, and therefore cannot guarantee the a fixed order of operations in different runs. This can be problematic in cases we like to check for plan changes in different runs.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Passes `PlanStabilitySuite` after regenerating the golden files.

Closes #29598 from dbaliafroozeh/FixOrderOfExpressions.

Authored-by: Ali Afroozeh <ali.afroozeh@databricks.com>
Signed-off-by: herman <herman@databricks.com>
2020-09-03 13:56:03 +02:00
Yuanjian Li 95f1e9549b [SPARK-32782][SS] Refactor StreamingRelationV2 and move it to catalyst
### What changes were proposed in this pull request?
Move StreamingRelationV2 to the catalyst module and bind with the Table interface.

### Why are the changes needed?
Currently, the StreamingRelationV2 is bind with TableProvider. Since the V2 relation is not bound with `DataSource`, to make it more flexible and have better expansibility, it should be moved to the catalyst module and bound with the Table interface. We did a similar thing for DataSourceV2Relation.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing UT.

Closes #29633 from xuanyuanking/SPARK-32782.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-03 16:04:36 +09:00
Kent Yao 1fba286407 [SPARK-32781][SQL] Non-ASCII characters are mistakenly omitted in the middle of intervals
### What changes were proposed in this pull request?

This PR fails the interval values parsing when they contain non-ASCII characters which are silently omitted right now.

e.g. the case below should be invalid

```
select interval 'interval中文 1 day'
```

### Why are the changes needed?

bugfix, intervals should fail when containing invalid characters

### Does this PR introduce _any_ user-facing change?

yes,

#### before

select interval 'interval中文 1 day'  results 1 day, now it fails with

```
org.apache.spark.sql.catalyst.parser.ParseException

Cannot parse the INTERVAL value: interval中文 1 day
```

### How was this patch tested?

new tests

Closes #29632 from yaooqinn/SPARK-32781.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-03 04:56:40 +00:00
angerszhu 5e6173ebef [SPARK-31670][SQL] Trim unnecessary Struct field alias in Aggregate/GroupingSets
### What changes were proposed in this pull request?
Struct field both in GROUP BY and Aggregate Expresison with CUBE/ROLLUP/GROUPING SET will failed when analysis.

```
test("SPARK-31670") {
  withTable("t1") {
      sql(
        """
          |CREATE TEMPORARY VIEW t(a, b, c) AS
          |SELECT * FROM VALUES
          |('A', 1, NAMED_STRUCT('row_id', 1, 'json_string', '{"i": 1}')),
          |('A', 2, NAMED_STRUCT('row_id', 2, 'json_string', '{"i": 1}')),
          |('A', 2, NAMED_STRUCT('row_id', 2, 'json_string', '{"i": 2}')),
          |('B', 1, NAMED_STRUCT('row_id', 3, 'json_string', '{"i": 1}')),
          |('C', 3, NAMED_STRUCT('row_id', 4, 'json_string', '{"i": 1}'))
        """.stripMargin)

      checkAnswer(
        sql(
          """
            |SELECT a, c.json_string, SUM(b)
            |FROM t
            |GROUP BY a, c.json_string
            |WITH CUBE
            |""".stripMargin),
        Row("A", "{\"i\": 1}", 3) :: Row("A", "{\"i\": 2}", 2) :: Row("A", null, 5) ::
          Row("B", "{\"i\": 1}", 1) :: Row("B", null, 1) ::
          Row("C", "{\"i\": 1}", 3) :: Row("C", null, 3) ::
          Row(null, "{\"i\": 1}", 7) :: Row(null, "{\"i\": 2}", 2) :: Row(null, null, 9) :: Nil)

  }
}
```
Error 
```
[info] - SPARK-31670 *** FAILED *** (2 seconds, 857 milliseconds)
[info]   Failed to analyze query: org.apache.spark.sql.AnalysisException: expression 't.`c`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
[info]   Aggregate [a#247, json_string#248, spark_grouping_id#246L], [a#247, c#223.json_string AS json_string#241, sum(cast(b#222 as bigint)) AS sum(b)#243L]
[info]   +- Expand [List(a#221, b#222, c#223, a#244, json_string#245, 0), List(a#221, b#222, c#223, a#244, null, 1), List(a#221, b#222, c#223, null, json_string#245, 2), List(a#221, b#222, c#223, null, null, 3)], [a#221, b#222, c#223, a#247, json_string#248, spark_grouping_id#246L]
[info]      +- Project [a#221, b#222, c#223, a#221 AS a#244, c#223.json_string AS json_string#245]
[info]         +- SubqueryAlias t
[info]            +- Project [col1#218 AS a#221, col2#219 AS b#222, col3#220 AS c#223]
[info]               +- Project [col1#218, col2#219, col3#220]
[info]                  +- LocalRelation [col1#218, col2#219, col3#220]
[info]
```
For Struct type Field, when we resolve it, it will construct with Alias. When struct field in GROUP BY with CUBE/ROLLUP etc,  struct field in groupByExpression and aggregateExpression will be resolved with different exprId as below
```
'Aggregate [cube(a#221, c#223.json_string AS json_string#240)], [a#221, c#223.json_string AS json_string#241, sum(cast(b#222 as bigint)) AS sum(b)#243L]
+- SubqueryAlias t
   +- Project [col1#218 AS a#221, col2#219 AS b#222, col3#220 AS c#223]
      +- Project [col1#218, col2#219, col3#220]
         +- LocalRelation [col1#218, col2#219, col3#220]
```
This makes `ResolveGroupingAnalytics.constructAggregateExprs()` failed to replace aggreagteExpression use expand groupByExpression attribute since there exprId is not same. then error happened.

### Why are the changes needed?
Fix analyze bug

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
Added UT

Closes #28490 from AngersZhuuuu/SPARK-31670.

Lead-authored-by: angerszhu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-02 13:49:09 +00:00
Zhenhua Wang 03afbc8820 [SPARK-32739][SQL] Support prune right for left semi join in DPP
### What changes were proposed in this pull request?

Currently in DPP, left semi can only prune left, this pr makes it also support prune right.

### Why are the changes needed?

A minor improvement for DPP.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Add a test case.

Closes #29582 from wzhfy/dpp_support_leftsemi_pruneRight.

Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2020-09-02 21:34:49 +08:00
Karol Chmist 7511e43c50 [SPARK-32756][SQL] Fix CaseInsensitiveMap usage for Scala 2.13
### What changes were proposed in this pull request?

This is a follow-up of #29160. This allows Spark SQL project to compile for Scala 2.13.

### Why are the changes needed?

It's needed for #28545

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

I compiled with Scala 2.13. It fails in `Spark REPL` project, which will be fixed by #28545

Closes #29584 from karolchmist/SPARK-32364-scala-2.13.

Authored-by: Karol Chmist <info+github@chmist.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-09-02 08:27:00 -05:00
angerszhu 55ce49ed28 [SPARK-32400][SQL][TEST][FOLLOWUP][TEST-MAVEN] Fix resource loading error in HiveScripTransformationSuite
### What changes were proposed in this pull request?
#29401 move `test_script.py` from sql/hive module to sql/core module, cause HiveScripTransformationSuite load resource issue.

### Why are the changes needed?
This issue cause jenkins test failed in mvn

spark-master-test-maven-hadoop-2.7-hive-2.3-jdk-11: https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7-hive-2.3-jdk-11/
spark-master-test-maven-hadoop-3.2-hive-2.3-jdk-11:
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-3.2-hive-2.3-jdk-11/
spark-master-test-maven-hadoop-3.2-hive-2.3:
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-3.2-hive-2.3/
![image](https://user-images.githubusercontent.com/46485123/91681585-71285a80-eb81-11ea-8519-99fc9783d6b9.png)

![image](https://user-images.githubusercontent.com/46485123/91681010-aaf86180-eb7f-11ea-8dbb-61365a3b0ab4.png)

Error as below:
```
 Exception thrown while executing Spark plan:
 HiveScriptTransformation [a#349299, b#349300, c#349301, d#349302, e#349303], python /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-hive-2.3-jdk-11/sql/hive/file:/home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-hive-2.3-jdk-11/sql/core/target/spark-sql_2.12-3.1.0-SNAPSHOT-tests.jar!/test_script.py, [a#349309, b#349310, c#349311, d#349312, e#349313], ScriptTransformationIOSchema(List(),List(),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),List((field.delim, )),List((field.delim, )),Some(org.apache.hadoop.hive.ql.exec.TextRecordReader),Some(org.apache.hadoop.hive.ql.exec.TextRecordWriter),false)
+- Project [_1#349288 AS a#349299, _2#349289 AS b#349300, _3#349290 AS c#349301, _4#349291 AS d#349302, _5#349292 AS e#349303]
   +- LocalTableScan [_1#349288, _2#349289, _3#349290, _4#349291, _5#349292]

 == Exception ==
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18021.0 failed 1 times, most recent failure: Lost task 0.0 in stage 18021.0 (TID 37324) (192.168.10.31 executor driver): org.apache.spark.SparkException: Subprocess exited with status 2. Error: python: can't open file '/home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-hive-2.3-jdk-11/sql/hive/file:/home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-hive-2.3-jdk-11/sql/core/target/spark-sql_2.12-3.1.0-SNAPSHOT-tests.jar!/test_script.py': [Errno 2] No such file or directory

 at org.apache.spark.sql.execution.BaseScriptTransformationExec.checkFailureAndPropagate(BaseScriptTransformationExec.scala:180)
 at org.apache.spark.sql.execution.BaseScriptTransformationExec.checkFailureAndPropagate$(BaseScriptTransformationExec.scala:157)
 at org.apache.spark.sql.hive.execution.HiveScriptTransformationExec.checkFailureAndPropagate(HiveScriptTransformationExec.scala:49)
 at org.apache.spark.sql.hive.execution.HiveScriptTransformationExec$$anon$1.hasNext(HiveScriptTransformationExec.scala:110)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
 at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
 at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
 at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:127)
 at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:480)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1426)
 at o
```
### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
Existed UT

Closes #29588 from AngersZhuuuu/SPARK-32400-FOLLOWUP.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-02 18:27:29 +09:00
liwensun f0851e95c6 [SPARK-32776][SS] Limit in streaming should not be optimized away by PropagateEmptyRelation
### What changes were proposed in this pull request?

PropagateEmptyRelation will not be applied to LIMIT operators in streaming queries.

### Why are the changes needed?

Right now, the limit operator in a streaming query may get optimized away when the relation is empty. This can be problematic for stateful streaming, as this empty batch will not write any state store files, and the next batch will fail when trying to read these state store files and throw a file not found error.

We should not let PropagateEmptyRelation optimize away the Limit operator for streaming queries.

This PR is intended as a small and safe fix for PropagateEmptyRelation. A fundamental fix that can prevent this from happening again in the future and in other optimizer rules is more desirable, but that's a much larger task.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
unit tests.

Closes #29623 from liwensun/spark-32776.

Authored-by: liwensun <liwen.sun@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-02 18:05:06 +09:00
Yuming Wang 54348dbd21 [SPARK-32767][SQL] Bucket join should work if spark.sql.shuffle.partitions larger than bucket number
### What changes were proposed in this pull request?

Bucket join should work if `spark.sql.shuffle.partitions` larger than bucket number, such as:
```scala
spark.range(1000).write.bucketBy(432, "id").saveAsTable("t1")
spark.range(1000).write.bucketBy(34, "id").saveAsTable("t2")
sql("set spark.sql.shuffle.partitions=600")
sql("set spark.sql.autoBroadcastJoinThreshold=-1")
sql("select * from t1 join t2 on t1.id = t2.id").explain()
```

Before this pr:
```
== Physical Plan ==
*(5) SortMergeJoin [id#26L], [id#27L], Inner
:- *(2) Sort [id#26L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#26L, 600), true
:     +- *(1) Filter isnotnull(id#26L)
:        +- *(1) ColumnarToRow
:           +- FileScan parquet default.t1[id#26L] Batched: true, DataFilters: [isnotnull(id#26L)], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 432 out of 432
+- *(4) Sort [id#27L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#27L, 600), true
      +- *(3) Filter isnotnull(id#27L)
         +- *(3) ColumnarToRow
            +- FileScan parquet default.t2[id#27L] Batched: true, DataFilters: [isnotnull(id#27L)], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 34 out of 34
```

After this pr:
```
== Physical Plan ==
*(4) SortMergeJoin [id#26L], [id#27L], Inner
:- *(1) Sort [id#26L ASC NULLS FIRST], false, 0
:  +- *(1) Filter isnotnull(id#26L)
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.t1[id#26L] Batched: true, DataFilters: [isnotnull(id#26L)], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 432 out of 432
+- *(3) Sort [id#27L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#27L, 432), true
      +- *(2) Filter isnotnull(id#27L)
         +- *(2) ColumnarToRow
            +- FileScan parquet default.t2[id#27L] Batched: true, DataFilters: [isnotnull(id#27L)], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 34 out of 34
```

### Why are the changes needed?

Spark 2.4 support this.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #29612 from wangyum/SPARK-32767.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-02 04:16:20 +00:00
Kousuke Saruta 812d0918a8 [SPARK-32771][DOCS] The example of expressions.Aggregator in Javadoc / Scaladoc is wrong
### What changes were proposed in this pull request?

This PR modifies an example for `expressions.Aggregator` in Javadoc and Scaladoc.
The definition of `bufferEncoder` and `outputEncoder` are added.

### Why are the changes needed?

To correct the example.
The current example is wrong and doesn't work because `bufferEncoder` and `outputEncoder` are not defined.

### Does this PR introduce _any_ user-facing change?

Yes.
Before this change, the scaladoc and javadoc are like as follows.
![wrong-example-java](https://user-images.githubusercontent.com/4736016/91897528-5ebf3580-ecd5-11ea-8d7b-e846b776ebbb.png)
![wrong-example](https://user-images.githubusercontent.com/4736016/91897509-58c95480-ecd5-11ea-81a3-98774083b689.png)

After this change, the docs are like as follows.
![fixed-example-java](https://user-images.githubusercontent.com/4736016/91897592-78607d00-ecd5-11ea-9e55-03fd9c9c6b54.png)
![fixed-example](https://user-images.githubusercontent.com/4736016/91897609-7c8c9a80-ecd5-11ea-837e-9dbcada6cd53.png)

### How was this patch tested?

Build with `build/sbt unidoc` and confirmed the generated javadoc/scaladoc and got the screenshots above.

Closes #29617 from sarutak/fix-aggregator-doc.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-02 10:03:07 +09:00
Linhong Liu a410658c9b [SPARK-32761][SQL] Allow aggregating multiple foldable distinct expressions
### What changes were proposed in this pull request?
For queries with multiple foldable distinct columns, since they will be eliminated during
execution, it's not mandatory to let `RewriteDistinctAggregates` handle this case. And
in the current code, `RewriteDistinctAggregates` *dose* miss some "aggregating with
multiple foldable distinct expressions" cases.
For example: `select count(distinct 2), count(distinct 2, 3)` will be missed.

But in the planner, this will trigger an error that "multiple distinct expressions" are not allowed.
As the foldable distinct columns can be eliminated finally, we can allow this in the aggregation
planner check.

### Why are the changes needed?
bug fix

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
added test case

Closes #29607 from linhongliu-db/SPARK-32761.

Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-01 13:04:24 +00:00
Wenchen Fan fea9360ae7 [SPARK-32757][SQL][FOLLOW-UP] Use child's output for canonicalization in SubqueryBroadcastExec
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/29601 , to fix a small mistake in `SubqueryBroadcastExec`. `SubqueryBroadcastExec.doCanonicalize` should canonicalize the build keys with the query output, not the `SubqueryBroadcastExec.output`.

### Why are the changes needed?

fix mistake

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing test

Closes #29610 from cloud-fan/follow.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-01 12:54:40 +00:00
Huaxin Gao e1dbc85c72 [SPARK-32579][SQL] Implement JDBCScan/ScanBuilder/WriteBuilder
### What changes were proposed in this pull request?
Add JDBCScan, JDBCScanBuilder, JDBCWriteBuilder in Datasource V2 JDBC

### Why are the changes needed?
Complete Datasource V2 JDBC implementation

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
new tests

Closes #29396 from huaxingao/v2jdbc.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-01 07:23:20 +00:00
Wenchen Fan d2a5dad97c [SPARK-32757][SQL] Physical InSubqueryExec should be consistent with logical InSubquery
### What changes were proposed in this pull request?

`InSubquery` can be either single-column mode, or multi-column mode, depending on the output length of the subquery. For multi-column mode, the length of input `values` must match the subquery output length.

However, `InSubqueryExec` doesn't follow it and always be executed under single column mode. It's OK as it's only used by DPP, which looks up one key in one `InSubqueryExec`, so the multi-column mode is not needed. But it's better to make the physical and logical node consistent.

This PR updates `InSubqueryExec` to support multi-column mode, and also fix `SubqueryBroadcastExec` to report output correctly.

### Why are the changes needed?

Fix a potential bug.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #29601 from cloud-fan/follow.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-01 07:19:43 +00:00
Yuming Wang a701bc79e3 [SPARK-32659][SQL][FOLLOWUP] Improve test for pruning DPP on non-atomic type
### What changes were proposed in this pull request?

Improve test for pruning DPP on non-atomic type:
- Avoid creating new partition tables. This may take 30 seconds..
- Add test `array` type.

### Why are the changes needed?

Improve test.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

N/A

Closes #29595 from wangyum/SPARK-32659-test.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-09-01 05:51:04 +00:00
Huaxin Gao 806140de40 [SPARK-32592][SQL] Make DataFrameReader.table take the specified options
### What changes were proposed in this pull request?
pass specified options in DataFrameReader.table to JDBCTableCatalog.loadTable

### Why are the changes needed?
Currently, `DataFrameReader.table` ignores the specified options. The options specified like the following are lost.
```
    val df = spark.read
      .option("partitionColumn", "id")
      .option("lowerBound", "0")
      .option("upperBound", "3")
      .option("numPartitions", "2")
      .table("h2.test.people")
```
We need to make `DataFrameReader.table` take the specified options.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually test for now. Will add a test after V2 JDBC read is implemented.

Closes #29535 from huaxingao/table_options.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-31 13:21:15 +00:00
Cheng Su ce473b223a [SPARK-32740][SQL] Refactor common partitioning/distribution logic to BaseAggregateExec
### What changes were proposed in this pull request?

For all three different aggregate physical operator: `HashAggregateExec`, `ObjectHashAggregateExec` and `SortAggregateExec`, they have same `outputPartitioning` and `requiredChildDistribution` logic. Refactor these same logic into their super class `BaseAggregateExec` to avoid code duplication and future bugs (similar to `HashJoin` and `ShuffledJoin`).

### Why are the changes needed?

Reduce duplicated code across classes and prevent future bugs if we only update one class but forget another. We already did similar refactoring for join (`HashJoin` and `ShuffledJoin`).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests as this is pure refactoring and no new logic added.

Closes #29583 from c21/aggregate-refactor.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-08-31 15:43:13 +09:00