Commit graph

7335 commits

Author SHA1 Message Date
Liang-Chi Hsieh 2c64b731ae
[SPARK-33259][SS] Disable streaming query with possible correctness issue by default
### What changes were proposed in this pull request?

This patch proposes to disable the streaming query with possible correctness issue in chained stateful operators. The behavior can be controlled by a SQL config, so if users understand the risk and still want to run the query, they can disable the check.

### Why are the changes needed?

The possible correctness in chained stateful operators in streaming query is not straightforward for users. From users perspective, it will be considered as a Spark bug. It is also possible the worse case, users are not aware of the correctness issue and use wrong results.

A better approach should be to disable such queries and let users choose to run the query if they understand there is such risk, instead of implicitly running the query and let users to find out correctness issue by themselves and report this known to Spark community.

### Does this PR introduce _any_ user-facing change?

Yes. Streaming query with possible correctness issue will be blocked to run, except for users explicitly disable the SQL config.

### How was this patch tested?

Unit test.

Closes #30210 from viirya/SPARK-33259.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-11-12 15:31:57 -08:00
Chao Sun cf3b6551ce
[SPARK-33435][SQL] DSv2: REFRESH TABLE should invalidate caches referencing the table
### What changes were proposed in this pull request?

This changes `RefreshTableExec` in DSv2 to also invalidate caches with references to the target table to be refreshed. The change itself is similar to what's done in #30211. Note that though, since we currently don't support caching a DSv2 table directly, this doesn't add recache logic as in the DSv1 impl. I marked it as a TODO for now.

### Why are the changes needed?

Currently the behavior in DSv1 and DSv2 is inconsistent w.r.t refreshing table: in DSv1 we invalidate both metadata cache as well as all table caches that are related to the table, but in DSv2 we only do the former. This addresses the issue and make the behavior consistent.

### Does this PR introduce _any_ user-facing change?

Yes, now refreshing a v2 table also invalidate all the related caches.

### How was this patch tested?

Added a new UT.

Closes #30359 from sunchao/SPARK-33435.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-11-12 15:22:56 -08:00
gengjiaan 2f07c56810 [SPARK-33278][SQL] Improve the performance for FIRST_VALUE
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/29800 provides a performance improvement for `NTH_VALUE`.
`FIRST_VALUE` also could use the `UnboundedOffsetWindowFunctionFrame` and `UnboundedPrecedingOffsetWindowFunctionFrame`.

### Why are the changes needed?
Improve the performance for `FIRST_VALUE`.

### Does this PR introduce _any_ user-facing change?
 'No'.

### How was this patch tested?
Jenkins test.

Closes #30178 from beliefer/SPARK-33278.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: beliefer <beliefer@163.com>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-12 14:59:22 +00:00
xuewei.linxuewei 6d31daeb6a [SPARK-33386][SQL] Accessing array elements in ElementAt/Elt/GetArrayItem should failed if index is out of bound
### What changes were proposed in this pull request?

Instead of returning NULL, throws runtime ArrayIndexOutOfBoundsException when ansiMode is enable for `element_at`,`elt`, `GetArrayItem` functions.

### Why are the changes needed?

For ansiMode.

### Does this PR introduce any user-facing change?

When `spark.sql.ansi.enabled` = true, Spark will throw `ArrayIndexOutOfBoundsException` if out-of-range index when accessing array elements

### How was this patch tested?

Added UT and existing UT.

Closes #30297 from leanken/leanken-SPARK-33386.

Authored-by: xuewei.linxuewei <xuewei.linxuewei@alibaba-inc.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-12 08:50:32 +00:00
Yuanjian Li 9f983a68f1 [SPARK-30294][SS][FOLLOW-UP] Directly override RDD methods
### Why are the changes needed?
Follow the comment: https://github.com/apache/spark/pull/26935#discussion_r514697997

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing test and Mima test.

Closes #30344 from xuanyuanking/SPARK-30294-follow.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-11-12 12:22:25 +09:00
Max Gekk 7e867298fe
[SPARK-33404][SQL][FOLLOWUP] Update benchmark results for date_trunc
### What changes were proposed in this pull request?
Updated results of `DateTimeBenchmark` in the environment:

| Item | Description |
| ---- | ----|
| Region | us-west-2 (Oregon) |
| Instance | r3.xlarge (spot instance) |
| AMI | ami-06f2f779464715dc5 (ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1) |
| Java | OpenJDK8/11 installed by`sudo add-apt-repository ppa:openjdk-r/ppa` & `sudo apt install openjdk-11-jdk`|

### Why are the changes needed?
The fix https://github.com/apache/spark/pull/30303 slowed down `date_trunc`. This PR updates benchmark results to have actual info about performance of `date_trunc`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By regenerating benchmark results:
```
$ SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.DateTimeBenchmark"
```

Closes #30338 from MaxGekk/fix-trunc_date-benchmark.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-11-11 08:50:43 -08:00
stczwd 1eb236b936 [SPARK-32512][SQL] add alter table add/drop partition command for datasourcev2
### What changes were proposed in this pull request?
This patch is trying to add `AlterTableAddPartitionExec` and `AlterTableDropPartitionExec` with the new table partition API, defined in #28617.

### Does this PR introduce _any_ user-facing change?
Yes. User can use `alter table add partition` or `alter table drop partition` to create/drop partition in V2Table.

### How was this patch tested?
Run suites and fix old tests.

Closes #29339 from stczwd/SPARK-32512-new.

Lead-authored-by: stczwd <qcsd2011@163.com>
Co-authored-by: Jacky Lee <qcsd2011@163.com>
Co-authored-by: Jackey Lee <qcsd2011@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-11 09:30:42 +00:00
Takeshi Yamamuro 4b367976a8 [SPARK-33417][SQL][TEST] Correct the behaviour of query filters in TPCDSQueryBenchmark
### What changes were proposed in this pull request?

This PR intends to fix the behaviour of query filters in `TPCDSQueryBenchmark`. We can use an option `--query-filter` for selecting TPCDS queries to run, e.g., `--query-filter q6,q8,q13`. But, the current master has a weird behaviour about the option. For example, if we pass `--query-filter q6` so as to run the TPCDS q6 only, `TPCDSQueryBenchmark` runs `q6` and `q6-v2.7` because the `filterQueries` method does not respect the name suffix. So, there is no way now to run the TPCDS q6 only.

### Why are the changes needed?

Bugfix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually checked.

Closes #30324 from maropu/FilterBugInTPCDSQueryBenchmark.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-11-11 15:24:05 +09:00
Terry Kim 6d5d030957 [SPARK-33414][SQL] Migrate SHOW CREATE TABLE command to use UnresolvedTableOrView to resolve the identifier
### What changes were proposed in this pull request?

This PR proposes to migrate `SHOW CREATE TABLE` to use `UnresolvedTableOrView` to resolve the table identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing).

Note that `SHOW CREATE TABLE` works only with a v1 table and a permanent view, and not supported for v2 tables.

### Why are the changes needed?

The changes allow consistent resolution behavior when resolving the table identifier. For example, the following is the current behavior:
```scala
sql("CREATE TEMPORARY VIEW t AS SELECT 1")
sql("CREATE DATABASE db")
sql("CREATE TABLE t (key INT, value STRING) USING hive")
sql("USE db")
sql("SHOW CREATE TABLE t AS SERDE") // Succeeds
```
With this change, `SHOW CREATE TABLE ... AS SERDE` above fails with the following:
```
org.apache.spark.sql.AnalysisException: t is a temp view not table or permanent view.; line 1 pos 0
  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTempViews$$anonfun$apply$7.$anonfun$applyOrElse$43(Analyzer.scala:883)
  at scala.Option.map(Option.scala:230)
```
, which is expected since temporary view is resolved first and `SHOW CREATE TABLE ... AS SERDE` doesn't support a temporary view.

Note that there is no behavior change for `SHOW CREATE TABLE` without `AS SERDE` since it was already resolving to a temporary view first. See below for more detail.

### Does this PR introduce _any_ user-facing change?

After this PR, `SHOW CREATE TABLE t AS SERDE` is resolved to a temp view `t` instead of table `db.t` in the above scenario.

Note that there is no behavior change for `SHOW CREATE TABLE` without `AS SERDE`, but the exception message changes from `SHOW CREATE TABLE is not supported on a temporary view` to `t is a temp view not table or permanent view`.

### How was this patch tested?

Updated existing tests.

Closes #30321 from imback82/show_create_table.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-11 05:54:27 +00:00
Max Gekk 1e2eeda20e [SPARK-33382][SQL][TESTS] Unify datasource v1 and v2 SHOW TABLES tests
### What changes were proposed in this pull request?
In the PR, I propose to gather common `SHOW TABLES` tests into one trait `org.apache.spark.sql.execution.command.ShowTablesSuite`, and put datasource specific tests to the `v1.ShowTablesSuite` and `v2.ShowTablesSuite`. Also tests for parsing `SHOW TABLES` are extracted to `ShowTablesParserSuite`.

### Why are the changes needed?
- The unification will allow to run common `SHOW TABLES` tests for both DSv1 and DSv2
- We can detect missing features and differences between DSv1 and DSv2 implementations.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running new test suites:
- `org.apache.spark.sql.execution.command.v1.ShowTablesSuite`
- `org.apache.spark.sql.execution.command.v2.ShowTablesSuite`
- `ShowTablesParserSuite`

Closes #30287 from MaxGekk/unify-dsv1_v2-tests.

Lead-authored-by: Max Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-11 05:26:46 +00:00
ulysses 5197c5d2e7 [SPARK-33390][SQL] Make Literal support char array
### What changes were proposed in this pull request?

Make Literal support char array.

### Why are the changes needed?

We always use `Literal()` to create foldable value, and `char[]` is a usual data type. We can make it easy that support create String Literal with `char[]`.

### Does this PR introduce _any_ user-facing change?

Yes, user can call `Literal()` with `char[]`.

### How was this patch tested?

Add test.

Closes #30295 from ulysses-you/SPARK-33390.

Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-11-11 11:39:11 +09:00
Chao Sun 4934da56bc [SPARK-33305][SQL] DSv2: DROP TABLE command should also invalidate cache
### What changes were proposed in this pull request?

This changes `DropTableExec` to also invalidate caches referencing the table to be dropped, in a cascading manner.

### Why are the changes needed?

In DSv1, `DROP TABLE` command also invalidate caches as described in [SPARK-19765](https://issues.apache.org/jira/browse/SPARK-19765). However in DSv2 the same command only drops the table but doesn't handle the caches. This could lead to correctness issue.

### Does this PR introduce _any_ user-facing change?

Yes. Now DSv2 `DROP TABLE` command also invalidates cache.

### How was this patch tested?

Added a new UT

Closes #30211 from sunchao/SPARK-33305.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-10 14:37:42 +00:00
Yuanjian Li ad02ceda29 [SPARK-33244][SQL] Unify the code paths for spark.table and spark.read.table
### What changes were proposed in this pull request?

- Call `spark.read.table` in `spark.table`.
- Add comments for `spark.table` to emphasize it also support streaming temp view reading.

### Why are the changes needed?
The code paths of `spark.table` and `spark.read.table` should be the same. This behavior is broke in SPARK-32592 since we need to respect options in `spark.read.table` API.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing UT.

Closes #30148 from xuanyuanking/SPARK-33244.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-10 05:46:45 +00:00
Terry Kim 90f6f39e42 [SPARK-33366][SQL] Migrate LOAD DATA command to use UnresolvedTable to resolve the identifier
### What changes were proposed in this pull request?

This PR proposes to migrate `LOAD DATA` to use `UnresolvedTable` to resolve the table identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing).

Note that `LOAD DATA` is not supported for v2 tables.

### Why are the changes needed?

The changes allow consistent resolution behavior when resolving the table identifier. For example, the following is the current behavior:
```scala
sql("CREATE TEMPORARY VIEW t AS SELECT 1")
sql("CREATE DATABASE db")
sql("CREATE TABLE t (key INT, value STRING) USING hive")
sql("USE db")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE t") // Succeeds
```
With this change, `LOAD DATA` above fails with the following:
```
org.apache.spark.sql.AnalysisException: t is a temp view not table.; line 1 pos 0
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTempViews$$anonfun$apply$7.$anonfun$applyOrElse$39(Analyzer.scala:865)
    at scala.Option.foreach(Option.scala:407)
```
, which is expected since temporary view is resolved first and `LOAD DATA` doesn't support a temporary view.

### Does this PR introduce _any_ user-facing change?

After this PR, `LOAD DATA ... t` is resolved to a temp view `t` instead of table `db.t` in the above scenario.

### How was this patch tested?

Updated existing tests.

Closes #30270 from imback82/load_data_cmd.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-10 05:28:06 +00:00
Gengliang Wang a1f84d8714 [SPARK-33369][SQL] DSV2: Skip schema inference in write if table provider supports external metadata
### What changes were proposed in this pull request?

When TableProvider.supportsExternalMetadata() is true, Spark will use the input Dataframe's schema in `DataframeWriter.save()`/`DataStreamWriter.start()` and skip schema/partitioning inference.

### Why are the changes needed?

For all the v2 data sources which are not FileDataSourceV2, Spark always infers the table schema/partitioning on `DataframeWriter.save()`/`DataStreamWriter.start()`.
The inference of table schema/partitioning can be expensive. However, there is no such trait or flag for indicating a V2 source can use the input DataFrame's schema on `DataframeWriter.save()`/`DataStreamWriter.start()`. We can resolve the problem by adding a new expected behavior for the method `TableProvider.supportsExternalMetadata()`.

### Does this PR introduce _any_ user-facing change?

Yes, a new behavior for the data source v2 API `TableProvider.supportsExternalMetadata()` when it returns true.

### How was this patch tested?

Unit test

Closes #30273 from gengliangwang/supportsExternalMetadata.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-10 04:43:32 +00:00
Gabor Somogyi 4ac8133866 [SPARK-33223][SS][UI] Structured Streaming Web UI state information
### What changes were proposed in this pull request?
Structured Streaming UI is not containing state information. In this PR I've added it.

### Why are the changes needed?
Missing state information.

### Does this PR introduce _any_ user-facing change?
Additional UI elements appear.

### How was this patch tested?
Existing unit tests + manual test.
<img width="1044" alt="Screenshot 2020-10-30 at 15 14 21" src="https://user-images.githubusercontent.com/18561820/97715405-a1797000-1ac2-11eb-886a-e3e6efa3af3e.png">

Closes #30151 from gaborgsomogyi/SPARK-33223.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
2020-11-10 11:22:35 +09:00
Peter Toth 84dc374611 [SPARK-33303][SQL] Deduplicate deterministic PythonUDF calls
### What changes were proposed in this pull request?
This PR modifies the `ExtractPythonUDFs` rule to deduplicate deterministic PythonUDF calls.

Before this PR the dataframe: `df.withColumn("c", batchedPythonUDF(col("a"))).withColumn("d", col("c"))` has the plan:
```
*(1) Project [value#1 AS a#4, pythonUDF1#15 AS c#7, pythonUDF1#15 AS d#10]
+- BatchEvalPython [dummyUDF(value#1), dummyUDF(value#1)], [pythonUDF0#14, pythonUDF1#15]
   +- LocalTableScan [value#1]
```
After this PR the deterministic PythonUDF calls are deduplicated:
```
*(1) Project [value#1 AS a#4, pythonUDF0#14 AS c#7, pythonUDF0#14 AS d#10]
+- BatchEvalPython [dummyUDF(value#1)], [pythonUDF0#14]
   +- LocalTableScan [value#1]
```

### Why are the changes needed?
To fix a performance issue.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
New and existing UTs.

Closes #30203 from peter-toth/SPARK-33303-deduplicate-deterministic-udf-calls.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-11-09 19:27:36 +09:00
Linhong Liu 4e1c89400d [SPARK-33140][SQL][FOLLOW-UP] Use sparkSession in AQE context when applying rules
### What changes were proposed in this pull request?
After #30097, all rules are using `SparkSession.active` to get `SQLConf`
and `SparkSession`. But in AQE, when applying the rules for the initial plan,
we should use the spark session in AQE context.

### Why are the changes needed?
Fix potential problem caused by using the wrong spark session

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing ut

Closes #30294 from linhongliu-db/SPARK-33140-followup.

Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-09 09:44:58 +00:00
Yuming Wang 7a5647a93a [SPARK-33385][SQL] Support bucket pruning for IsNaN
### What changes were proposed in this pull request?

This pr add support bucket pruning on `IsNaN` predicate.

### Why are the changes needed?

Improve query performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #30291 from wangyum/SPARK-33385.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-09 09:20:31 +00:00
Yuming Wang 69799c514f [SPARK-33372][SQL] Fix InSet bucket pruning
### What changes were proposed in this pull request?

This pr fix `InSet` bucket pruning because of it's values should not be `Literal`:
cbd3fdea62/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala (L253-L255)

### Why are the changes needed?

Fix bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test and manual test:

```scala
spark.sql("select id as a, id as b from range(10000)").write.bucketBy(100, "a").saveAsTable("t")
spark.sql("select * from t where a in (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)").show
```

Before this PR | After this PR
-- | --
![image](https://user-images.githubusercontent.com/5399861/98380788-fb120980-2083-11eb-8fae-4e21ad873e9b.png) | ![image](https://user-images.githubusercontent.com/5399861/98381095-5ba14680-2084-11eb-82ca-2d780c85305c.png)

Closes #30279 from wangyum/SPARK-33372.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-09 08:32:51 +00:00
Wenchen Fan 98730b7ee2 [SPARK-33087][SQL] DataFrameWriterV2 should delegate table resolution to the analyzer
### What changes were proposed in this pull request?

This PR makes `DataFrameWriterV2` to create query plans with `UnresolvedRelation` and leave the table resolution work to the analyzer.

### Why are the changes needed?

Table resolution work should be done by the analyzer. After this PR, the behavior is more consistent between different APIs (DataFrameWriter, DataFrameWriterV2 and SQL). See the next section for behavior changes.

### Does this PR introduce _any_ user-facing change?

Yes.
1. writes to a temp view of v2 relation: previously it fails with table not found exception, now it works if the v2 relation is writable. This is consistent with `DataFrameWriter` and SQL INSERT.
2. writes to other temp views: previously it fails with table not found exception, now it fails with a more explicit error message, saying that writing to a temp view of non-v2-relation is not allowed.
3. writes to a view: previously it fails with table not writable error, now it fails with a more explicit error message, saying that writing to a view is not allowed.
4. writes to a v1 table: previously it fails with table not writable error, now it fails with a more explicit error message, saying that writing to a v1 table is not allowed. (We can allow it later, by falling back to v1 command)

### How was this patch tested?

new tests

Closes #29970 from cloud-fan/refactor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-09 08:08:00 +00:00
Huaxin Gao bfb257f078 [SPARK-32405][SQL] Apply table options while creating tables in JDBC Table Catalog
### What changes were proposed in this pull request?
Currently in JDBCTableCatalog, we ignore the table options when creating table.
```
    // TODO (SPARK-32405): Apply table options while creating tables in JDBC Table Catalog
    if (!properties.isEmpty) {
      logWarning("Cannot create JDBC table with properties, these properties will be " +
        "ignored: " + properties.asScala.map { case (k, v) => s"$k=$v" }.mkString("[", ", ", "]"))
    }
```

### Why are the changes needed?
need to apply the table options when we create table

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
add new test

Closes #30154 from huaxingao/table_options.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-09 07:02:14 +00:00
Liang-Chi Hsieh c269b53f07 [SPARK-33384][SS] Delete temporary file when cancelling writing to final path even underlying stream throwing error
### What changes were proposed in this pull request?

In `RenameBasedFSDataOutputStream.cancel`, we do two things: closing underlying stream and delete temporary file, in a single try/catch block. Closing `OutputStream` could possibly throw `IOException` so we possibly missing deleting temporary file.

This patch proposes to delete temporary even underlying stream throwing error.

### Why are the changes needed?

To avoid leaving temporary files during canceling writing in `RenameBasedFSDataOutputStream`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test.

Closes #30290 from viirya/SPARK-33384.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-11-08 18:44:26 -08:00
yangjie01 02fd52cfbc [SPARK-33352][CORE][SQL][SS][MLLIB][AVRO][K8S] Fix procedure-like declaration compilation warnings in Scala 2.13
### What changes were proposed in this pull request?
There are two similar compilation warnings about procedure-like declaration in Scala 2.13:

```
[WARNING] [Warn] /spark/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:70: procedure syntax is deprecated for constructors: add `=`, as in method definition
```
and

```
[WARNING] [Warn] /spark/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:211: procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `run`'s return type
```

this pr is the first part to resolve SPARK-33352:

- For constructors method definition add `=` to convert to function syntax

- For without `return type` methods definition add `: Unit =` to convert to function syntax

### Why are the changes needed?
Eliminate compilation warnings in Scala 2.13 and this change should be compatible with Scala 2.12

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass the Jenkins or GitHub Action

Closes #30255 from LuciferYang/SPARK-29392-FOLLOWUP.1.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-11-08 12:51:48 -06:00
Terry Kim 68c032c246 [SPARK-33364][SQL] Introduce the "purge" option in TableCatalog.dropTable for v2 catalog
### What changes were proposed in this pull request?

This PR proposes to introduce the `purge` option in `TableCatalog.dropTable` so that v2 catalogs can use the option if needed.

Related discussion: https://github.com/apache/spark/pull/30079#discussion_r510594110

### Why are the changes needed?

Spark DDL supports passing the purge option to `DROP TABLE` command. However, the option is not used (ignored) for v2 catalogs.

### Does this PR introduce _any_ user-facing change?

This PR introduces a new API in `TableCatalog`.

### How was this patch tested?

Added a test.

Closes #30267 from imback82/purge_table.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-11-05 22:00:45 -08:00
Prashant Sharma 733a468726 [SPARK-33130][SQL] Support ALTER TABLE in JDBC v2 Table Catalog: add, update type and nullability of columns (MsSqlServer dialect)
### What changes were proposed in this pull request?

Override the default SQL strings for:
ALTER TABLE RENAME COLUMN
ALTER TABLE UPDATE COLUMN NULLABILITY
in the following MsSQLServer JDBC dialect according to official documentation.
Write MsSqlServer integration tests for JDBC.

### Why are the changes needed?

To add the support for alter table when interacting with MSSql Server.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

added tests

Closes #30038 from ScrapCodes/mssql-dialect.

Authored-by: Prashant Sharma <prashsh1@in.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-06 05:46:38 +00:00
Wenchen Fan d16311051d [SPARK-32934][SQL][FOLLOW-UP] Refine class naming and code comments
### What changes were proposed in this pull request?

1. Rename `OffsetWindowSpec` to `OffsetWindowFunction`, as it's the base class for all offset based window functions.
2. Refine and add more comments.
3. Remove `isRelative` as it's useless.

### Why are the changes needed?

code refinement

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #30261 from cloud-fan/window.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-06 05:20:25 +00:00
Wenchen Fan cd4e3d3b0c [SPARK-33360][SQL] Simplify DS v2 write resolution
### What changes were proposed in this pull request?

Removing duplicated code in `ResolveOutputRelation`, by adding `V2WriteCommand.withNewQuery`

### Why are the changes needed?

code cleanup

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing tests

Closes #30264 from cloud-fan/ds-minor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-11-05 15:44:04 -08:00
Jungtaek Lim (HeartSaVioR) 21413b7dd4 [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
### What changes were proposed in this pull request?

There's a concept of 'read-only' and 'read+write' state store in Spark which is defined "implicitly". Spark doesn't prevent write for 'read-only' state store; Spark just assumes read-only stateful operator will not modify the state store. Given it's not defined explicitly, the instance of state store has to be implemented as 'read+write' even it's being used as 'read-only', which sometimes brings confusion.

For example, abort() in HDFSBackedStateStore - d38f816748/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala (L143-L155)

The comment sounds as if statement works differently between 'read-only' and 'read+write', but that's not true as both state store has state initialized as UPDATING (no difference). So 'read-only' state also creates the temporary file, initializes output streams to write to temporary file, closes output streams, and finally deletes the temporary file. This unnecessary operations are being done per batch/partition.

This patch explicitly defines 'read-only' StateStore, and enables state store provider to create 'read-only' StateStore instance if requested. Relevant code paths are modified, as well as 'read-only' StateStore implementation for HDFSBackedStateStore is introduced. The new implementation gets rid of unnecessary operations explained above.

In point of backward-compatibility view, the only thing being changed in public API side is `StateStoreProvider`. The trait `StateStoreProvider` has to be changed to allow requesting 'read-only' StateStore; this patch adds default implementation which leverages 'read+write' StateStore but wrapping with 'write-protected' StateStore instance, so that custom providers don't need to change their code to reflect the change. But if the providers can optimize for read-only workload, they'll be happy to make a change.

Please note that this patch makes ReadOnlyStateStore extend StateStore and being referred as StateStore, as StateStore is being used in so many places and it's not easy to support both traits if we differentiate them. So unfortunately these write methods are still exposed for read-only state; it just throws UnsupportedOperationException.

### Why are the changes needed?

The new API opens the chance to optimize read-only state store instance compared with read+write state store instance. HDFSBackedStateStoreProvider is modified to provide read-only version of state store which doesn't deal with temporary file as well as state machine.

### Does this PR introduce any user-facing change?

Clearly "no" for most end users, and also "no" for custom state store providers as it doesn't touch trait `StateStore` as well as provides default implementation for added method in trait `StateStoreProvider`.

### How was this patch tested?

Modified UT. Existing UTs ensure the change doesn't break anything.

Closes #26935 from HeartSaVioR/SPARK-30294.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
2020-11-05 18:21:17 +09:00
HyukjinKwon d530ed0ea8 Revert "[SPARK-33277][PYSPARK][SQL] Use ContextAwareIterator to stop consuming after the task ends"
This reverts commit b8a440f098.
2020-11-05 16:15:17 +09:00
Dongjoon Hyun 42c0b175ce [SPARK-33338][SQL] GROUP BY using literal map should not fail
### What changes were proposed in this pull request?

This PR aims to fix `semanticEquals` works correctly on `GetMapValue` expressions having literal maps with `ArrayBasedMapData` and `GenericArrayData`.

### Why are the changes needed?

This is a regression from Apache Spark 1.6.x.
```scala
scala> sc.version
res1: String = 1.6.3

scala> sqlContext.sql("SELECT map('k1', 'v1')[k] FROM t GROUP BY map('k1', 'v1')[k]").show
+---+
|_c0|
+---+
| v1|
+---+
```

Apache Spark 2.x ~ 3.0.1 raise`RuntimeException` for the following queries.
```sql
CREATE TABLE t USING ORC AS SELECT map('k1', 'v1') m, 'k1' k
SELECT map('k1', 'v1')[k] FROM t GROUP BY 1
SELECT map('k1', 'v1')[k] FROM t GROUP BY map('k1', 'v1')[k]
SELECT map('k1', 'v1')[k] a FROM t GROUP BY a
```

**BEFORE**
```scala
Caused by: java.lang.RuntimeException: Couldn't find k#3 in [keys: [k1], values: [v1][k#3]#6]
	at scala.sys.package$.error(package.scala:27)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:85)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:79)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
```

**AFTER**
```sql
spark-sql> SELECT map('k1', 'v1')[k] FROM t GROUP BY 1;
v1
Time taken: 1.278 seconds, Fetched 1 row(s)
spark-sql> SELECT map('k1', 'v1')[k] FROM t GROUP BY map('k1', 'v1')[k];
v1
Time taken: 0.313 seconds, Fetched 1 row(s)
spark-sql> SELECT map('k1', 'v1')[k] a FROM t GROUP BY a;
v1
Time taken: 0.265 seconds, Fetched 1 row(s)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs with the newly added test case.

Closes #30246 from dongjoon-hyun/SPARK-33338.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-11-04 08:35:10 -08:00
Terry Kim 0ad35ba5f8 [SPARK-33321][SQL] Migrate ANALYZE TABLE commands to use UnresolvedTableOrView to resolve the identifier
### What changes were proposed in this pull request?

This PR proposes to migrate `ANALYZE TABLE` and `ANALYZE TABLE ... FOR COLUMNS` to use `UnresolvedTableOrView` to resolve the table/view identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing).

Note that `ANALYZE TABLE` is not supported for v2 tables.

### Why are the changes needed?

The changes allow consistent resolution behavior when resolving the table/view identifier. For example, the following is the current behavior:
```scala
sql("create temporary view t as select 1")
sql("create database db")
sql("create table db.t using csv as select 1")
sql("use db")
sql("ANALYZE TABLE t compute statistics") // Succeeds
```
With this change, ANALYZE TABLE above fails with the following:
```
    org.apache.spark.sql.AnalysisException: t is a temp view not table or permanent view.; line 1 pos 0
	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTempViews$$anonfun$apply$7.$anonfun$applyOrElse$40(Analyzer.scala:872)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTempViews$$anonfun$apply$7.applyOrElse(Analyzer.scala:870)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTempViews$$anonfun$apply$7.applyOrElse(Analyzer.scala:856)
```
, which is expected since temporary view is resolved first and ANALYZE TABLE doesn't support a temporary view.

### Does this PR introduce _any_ user-facing change?

After this PR, `ANALYZE TABLE t` is resolved to a temp view `t` instead of table `db.t`.

### How was this patch tested?

Updated existing tests.

Closes #30229 from imback82/parse_v1table.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-04 06:50:37 +00:00
Wenchen Fan 034070a23a Revert "[SPARK-33248][SQL] Add a configuration to control the legacy behavior of whether need to pad null value when value size less then schema size"
This reverts commit 0c943cd2fb.
2020-11-04 12:30:38 +08:00
Chao Sun d900c6ff49 [SPARK-33293][SQL][FOLLOW-UP] Rename TableWriteExec to TableWriteExecHelper
### What changes were proposed in this pull request?

Rename `TableWriteExec` in `WriteToDataSourceV2Exec.scala` to `TableWriteExecHelper`.

### Why are the changes needed?

See [discussion](https://github.com/apache/spark/pull/30193#discussion_r516412653). The former is too general.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

N/A

Closes #30235 from sunchao/SPARK-33293-2.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-11-03 14:53:01 -08:00
Max Gekk bdabf60fb4 [SPARK-33299][SQL][DOCS] Don't mention schemas in JSON format in docs for from_json
### What changes were proposed in this pull request?
Remove the JSON formatted schema from comments for `from_json()` in Scala/Python APIs.

Closes #30201

### Why are the changes needed?
Schemas in JSON format is internal (not documented). It shouldn't be recommenced for usage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By linters.

Closes #30226 from MaxGekk/from_json-common-schema-parsing-2.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-11-02 10:10:24 -08:00
Cheng Su e52b858ef7 [SPARK-33027][SQL] Add DisableUnnecessaryBucketedScan rule to AQE
### What changes were proposed in this pull request?

As a followup comment from https://github.com/apache/spark/pull/29804#issuecomment-700650620 , here we add add the physical plan rule DisableUnnecessaryBucketedScan into AQE AdaptiveSparkPlanExec.queryStagePreparationRules, to make auto bucketed scan work with AQE.

The change is mostly in:
* `AdaptiveSparkPlanExec.scala`: add physical plan rule `DisableUnnecessaryBucketedScan`
* `DisableUnnecessaryBucketedScan.scala`: propagate logical plan link for the file source scan exec operator, otherwise we lose the logical plan link information when AQE is enabled, and will get exception [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L176). (for example, for query `SELECT * FROM bucketed_table` with AQE is enabled)
* `DisableUnnecessaryBucketedScanSuite.scala`: add new test suite for AQE enabled - `DisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE`, and changed some of tests to use `AdaptiveSparkPlanHelper.find/collect`, to make the plan verification work when AQE enabled.

### Why are the changes needed?

It's reasonable to add the support to allow disabling unnecessary bucketed scan with AQE is enabled, this helps optimize the query when AQE is enabled.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit test in `DisableUnnecessaryBucketedScanSuite`.

Closes #30200 from c21/auto-bucket-aqe.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-02 06:44:07 +00:00
Prashant Sharma 6226ccc092 [SPARK-33095] Follow up, support alter table column rename
### What changes were proposed in this pull request?

Support rename column for mysql dialect.

### Why are the changes needed?

At the moment, it does not work for mysql version 5.x. So, we should throw proper exception for that case.

### Does this PR introduce _any_ user-facing change?

Yes, `column rename` with mysql dialect should work correctly.

### How was this patch tested?

Added tests for rename column.
Ran the tests to pass with both versions of mysql.

* `export MYSQL_DOCKER_IMAGE_NAME=mysql:5.7.31`

* `export MYSQL_DOCKER_IMAGE_NAME=mysql:8.0`

Closes #30142 from ScrapCodes/mysql-dialect-rename.

Authored-by: Prashant Sharma <prashsh1@in.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-11-02 05:03:41 +00:00
Takuya UESHIN b8a440f098 [SPARK-33277][PYSPARK][SQL] Use ContextAwareIterator to stop consuming after the task ends
### What changes were proposed in this pull request?

As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends.

### Why are the changes needed?

Python/Pandas UDF right after off-heap vectorized reader could cause executor crash.

E.g.,:

```py
spark.range(0, 100000, 1, 1).write.parquet(path)

spark.conf.set("spark.sql.columnVector.offheap.enabled", True)

def f(x):
    return 0

fUdf = udf(f, LongType())

spark.read.parquet(path).select(fUdf('id')).head()
```

This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added tests, and manually.

Closes #30177 from ueshin/issues/SPARK-33277/python_pandas_udf.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-11-01 20:28:12 +09:00
wangguangxin.cn 69c27f49ac [SPARK-33306][SQL] Timezone is needed when cast date to string
### What changes were proposed in this pull request?
When `spark.sql.legacy.typeCoercion.datetimeToString.enabled` is enabled, spark will cast date to string when compare date with string. In Spark3, timezone is needed when casting date to string as 72ad9dcd5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala (L309).

Howerver, the timezone may not be set because `CastBase.needsTimeZone` returns false for this kind of casting.

A simple way to reproduce this is
```
spark-shell --conf spark.sql.legacy.typeCoercion.datetimeToString.enabled=true

```
when we execute the following sql,
```
select a.d1 from
(select to_date(concat('2000-01-0', id)) as d1 from range(1, 2)) a
join
(select concat('2000-01-0', id) as d2 from range(1, 2)) b
on a.d1 = b.d2
```
it will throw
```
java.util.NoSuchElementException: None.get
  at scala.None$.get(Option.scala:529)
  at scala.None$.get(Option.scala:527)
  at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId(datetimeExpressions.scala:56)
  at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId$(datetimeExpressions.scala:56)
  at org.apache.spark.sql.catalyst.expressions.CastBase.zoneId$lzycompute(Cast.scala:253)
  at org.apache.spark.sql.catalyst.expressions.CastBase.zoneId(Cast.scala:253)
  at org.apache.spark.sql.catalyst.expressions.CastBase.dateFormatter$lzycompute(Cast.scala:287)
  at org.apache.spark.sql.catalyst.expressions.CastBase.dateFormatter(Cast.scala:287)
```

### Why are the changes needed?
As described above, it's a bug here.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add more UT

Closes #30213 from WangGuangxin/SPARK-33306.

Authored-by: wangguangxin.cn <wangguangxin.cn@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-31 15:14:46 -07:00
Chao Sun c51e5fc14b [SPARK-33293][SQL] Refactor WriteToDataSourceV2Exec and reduce code duplication
### What changes were proposed in this pull request?

Refactor `WriteToDataSourceV2Exec` via removing code duplication around write to table logic:
- renamed `AtomicTableWriteExec` to `TableWriteExec` so that the table write logic in this trait can be modified and shared with `CreateTableAsSelectExec`, `ReplaceTableAsSelectExec`, `AtomicCreateTableAsSelectExec ` and `AtomicReplaceTableAsSelectExec`.
- similar to the above, renamed `writeToStagedTable` to `writeToTable` in `TableWriteExec`.
- extended `writeToTable` so that it can handle both staged table as well as non-staged table.

### Why are the changes needed?

Simplify the logic and remove duplication, to make this piece of code easier to maintain.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass CIs with the existing test coverage.

Closes #30193 from sunchao/SPARK-33293.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-31 10:01:31 -07:00
Chao Sun 32b78d3795 [SPARK-33290][SQL] REFRESH TABLE should invalidate cache even though the table itself may not be cached
### What changes were proposed in this pull request?

In `CatalogImpl.refreshTable`, this moves the `uncacheQuery` call out of the condition `if (cache.nonEmpty)` so that it will be called whether the table itself is cached or not.

### Why are the changes needed?

In the case like the following:
```sql
CREATE TABLE t ...;
CREATE VIEW t1 AS SELECT * FROM t;
REFRESH TABLE t;
```

If the table `t` is refreshed, the view `t1` which is depending on `t` will not be invalidated. This could lead to incorrect result and is similar to [SPARK-19765](https://issues.apache.org/jira/browse/SPARK-19765).

On the other hand, if we have:

```sql
CREATE TABLE t ...;
CACHE TABLE t;
CREATE VIEW t1 AS SELECT * FROM t;
REFRESH TABLE t;
```

Then the view `t1` will be refreshed. The behavior is somewhat inconsistent.

### Does this PR introduce _any_ user-facing change?

Yes, with the change any cache that are depending on the table refreshed will be invalidated with the change. Previously this only happens if the table itself is cached.

### How was this patch tested?

Added a new UT for the case.

Closes #30187 from sunchao/SPARK-33290.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-31 09:49:18 -07:00
ulysses d59f6a7095 [SPARK-33294][SQL] Add query resolved check before analyze InsertIntoDir
### What changes were proposed in this pull request?

Add `query.resolved` before analyze `InsertIntoDir`.

### Why are the changes needed?

For better error msg.
```
INSERT OVERWRITE DIRECTORY '/tmp/file' USING PARQUET
SELECT * FROM (
 SELECT c3 FROM (
  SELECT c1, c2 from values(1,2) t(c1, c2)
  )
)
```
 Before this PR, we get such error msg
```
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to toAttribute on unresolved object, tree: *
  at org.apache.spark.sql.catalyst.analysis.Star.toAttribute(unresolved.scala:244)
  at org.apache.spark.sql.catalyst.plans.logical.Project$$anonfun$output$1.apply(basicLogicalOperators.scala:52)
  at org.apache.spark.sql.catalyst.plans.logical.Project$$anonfun$output$1.apply(basicLogicalOperators.scala:52)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:392)
```

### Does this PR introduce _any_ user-facing change?

Yes, error msg changed.

### How was this patch tested?

New test.

Closes #30197 from ulysses-you/SPARK-33294.

Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-30 08:18:10 +00:00
angerszhu 0c943cd2fb [SPARK-33248][SQL] Add a configuration to control the legacy behavior of whether need to pad null value when value size less then schema size
### What changes were proposed in this pull request?
Add a configuration to control the legacy behavior of whether need to pad null value when value size less then schema size.
Since we can't decide whether it's a but and some use need it behavior same as Hive.

### Why are the changes needed?
Provides a compatible choice between historical behavior and Hive

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

Closes #30156 from AngersZhuuuu/SPARK-33284.

Lead-authored-by: angerszhu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-30 14:11:25 +09:00
Max Gekk 343e0bb3ad [SPARK-33286][SQL] Improve the error message about schema parsing by from_json/from_csv
# What changes were proposed in this pull request?
In the PR, I propose to improve the error message from `from_json`/`from_csv` by combining errors from all schema parsers:
- DataType.fromJson (except CSV)
- CatalystSqlParser.parseDataType
- CatalystSqlParser.parseTableSchema

Before the changes, `from_json` does not show error messages from the first parser in the chain that could mislead users.

### Why are the changes needed?
Currently, `from_json` outputs the error message from the fallback schema parser which can confuse end-users. For example:

```scala
    val invalidJsonSchema = """{"fields": [{"a":123}], "type": "struct"}"""
    df.select(from_json($"json", invalidJsonSchema, Map.empty[String, String])).show()
```
The JSON schema has an issue in `{"a":123}` but the error message doesn't point it out:
```
mismatched input '{' expecting {'ADD', 'AFTER', ...}(line 1, pos 0)

== SQL ==
{"fields": [{"a":123}], "type": "struct"}
^^^

org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '{' expecting {'ADD', 'AFTER',  ... }(line 1, pos 0)

== SQL ==
{"fields": [{"a":123}], "type": "struct"}
^^^
```

### Does this PR introduce _any_ user-facing change?
Yes, after the changes for the example above:
```
Cannot parse the schema in JSON format: Failed to convert the JSON string '{"a":123}' to a field.
Failed fallback parsing: Cannot parse the data type:
mismatched input '{' expecting {'ADD', 'AFTER', ...}(line 1, pos 0)

== SQL ==
{"fields": [{"a":123}], "type": "struct"}
^^^

Failed fallback parsing:
mismatched input '{' expecting {'ADD', 'AFTER', ...}(line 1, pos 0)

== SQL ==
{"fields": [{"a":123}], "type": "struct"}
^^^
```

### How was this patch tested?
- By existing tests suites like `JsonFunctionsSuite` and `JsonExpressionsSuite`.
- Add new test to `JsonFunctionsSuite`.
- Re-gen results for `json-functions.sql`.

Closes #30183 from MaxGekk/fromDDL-error-msg.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-30 11:18:47 +09:00
luluorta cbd3fdea62 [SPARK-33008][SQL] Division by zero on divide-like operations returns incorrect result
### What changes were proposed in this pull request?
In ANSI mode, when a division by zero occurs performing a divide-like operation (Divide, IntegralDivide, Remainder or Pmod), we are returning an incorrect value. Instead, we should throw an exception, as stated in the SQL standard.

### Why are the changes needed?
Result corrupt.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
added UT + existing UTs (improved)

Closes #29882 from luluorta/SPARK-33008.

Authored-by: luluorta <luluorta@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-29 16:44:17 +00:00
Liang-Chi Hsieh 056b62264b [SPARK-33263][SS] Configurable StateStore compression codec
### What changes were proposed in this pull request?

This patch proposes to make StateStore compression codec configurable.

### Why are the changes needed?

Currently the compression codec of StateStore is not configurable and hard-coded to be lz4. It is better if we can follow Spark other modules to configure the compression codec of StateStore. For example, we can choose zstd codec and zstd is configurable with different compression level.

### Does this PR introduce _any_ user-facing change?

Yes, after this change users can config different codec for StateStore.

### How was this patch tested?

Unit test.

Closes #30162 from viirya/SPARK-33263.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-29 07:44:44 -07:00
Max Gekk b409025641 [SPARK-33281][SQL] Return SQL schema instead of Catalog string from the SchemaOfCsv expression
### What changes were proposed in this pull request?
Return schema in SQL format instead of Catalog string from the SchemaOfCsv expression.

### Why are the changes needed?
To unify output of the `schema_of_json()` and `schema_of_csv()`.

### Does this PR introduce _any_ user-facing change?
Yes, they can but `schema_of_csv()` is usually used in combination with `from_csv()`, so, the format of schema shouldn't be much matter.

Before:
```
> SELECT schema_of_csv('1,abc');
  struct<_c0:int,_c1:string>
```

After:
```
> SELECT schema_of_csv('1,abc');
  STRUCT<`_c0`: INT, `_c1`: STRING>
```

### How was this patch tested?
By existing test suites `CsvFunctionsSuite` and `CsvExpressionsSuite`.

Closes #30180 from MaxGekk/schema_of_csv-sql-schema.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-29 21:02:10 +09:00
Max Gekk 9d5e48ea95 [SPARK-33270][SQL] Return SQL schema instead of Catalog string from the SchemaOfJson expression
### What changes were proposed in this pull request?
Return schema in SQL format instead of Catalog string from the `SchemaOfJson` expression.

### Why are the changes needed?
In some cases, `from_json()` cannot parse schemas returned by `schema_of_json`, for instance, when JSON fields have spaces (gaps). Such fields will be quoted after the changes, and can be parsed by `from_json()`.

Here is the example:
```scala
val in = Seq("""{"a b": 1}""").toDS()
in.select(from_json('value, schema_of_json("""{"a b": 100}""")) as "parsed")
```
raises the exception:
```
== SQL ==
struct<a b:bigint>
------^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:263)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:130)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseTableSchema(ParseDriver.scala:76)
	at org.apache.spark.sql.types.DataType$.fromDDL(DataType.scala:131)
	at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalTypeExpr(ExprUtils.scala:33)
	at org.apache.spark.sql.catalyst.expressions.JsonToStructs.<init>(jsonExpressions.scala:537)
	at org.apache.spark.sql.functions$.from_json(functions.scala:4141)
```

### Does this PR introduce _any_ user-facing change?
Yes. For example, `schema_of_json` for the input `{"col":0}`.

Before: `struct<col:bigint>`
After: `STRUCT<`col`: BIGINT>`

### How was this patch tested?
By existing test suites `JsonFunctionsSuite` and `JsonExpressionsSuite`.

Closes #30172 from MaxGekk/schema_of_json-sql-schema.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-29 10:30:41 +09:00
Jungtaek Lim (HeartSaVioR) a744fea3be [SPARK-33267][SQL] Fix NPE issue on 'In' filter when one of values contains null
### What changes were proposed in this pull request?

This PR proposes to fix the NPE issue on `In` filter when one of values contain null. In real case, you can trigger this issue when you try to push down the filter with `in (..., null)` against V2 source table. `DataSourceStrategy` caches the mapping (filter instance -> expression) in HashMap, which leverages hash code on the key, hence it could trigger the NPE issue.

### Why are the changes needed?

This is an obvious bug as `In` filter doesn't care about null value when calculating hash code.

### Does this PR introduce _any_ user-facing change?

Yes, previously the query with having `null` in "in" condition against data source V2 source table supporting push down filter failed with NPE, whereas after the PR the query will not fail.

### How was this patch tested?

UT added. The new UT fails without the PR and passes with the PR.

Closes #30170 from HeartSaVioR/SPARK-33267.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-28 10:00:29 -07:00
zky.zhoukeyong b26ae98407 [SPARK-33208][SQL] Update the document of SparkSession#sql
Change-Id: I82db1f9e8f667573aa3a03e05152cbed0ea7686b

### What changes were proposed in this pull request?
Update the document of SparkSession#sql, mention that this API eagerly runs DDL/DML commands, but not for SELECT queries.

### Why are the changes needed?
To clarify the behavior of SparkSession#sql.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
No needed.

Closes #30168 from waitinfuture/master.

Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-28 13:17:28 +00:00
gengjiaan 3c3ad5f7c0 [SPARK-32934][SQL] Improve the performance for NTH_VALUE and reactor the OffsetWindowFunction
### What changes were proposed in this pull request?
Spark SQL supports some window function like `NTH_VALUE`.
If we specify window frame like `UNBOUNDED PRECEDING AND CURRENT ROW` or `UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING`, we can elimate some calculations.
For example: if we execute the SQL show below:
```
SELECT NTH_VALUE(col,
         2) OVER(ORDER BY rank UNBOUNDED PRECEDING
        AND CURRENT ROW)
FROM tab;
```
The output for row number greater than 1, return the fixed value. otherwise, return null. So we just calculate the value once and notice whether the row number less than 2.
`UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING` is simpler.

### Why are the changes needed?
Improve the performance for `NTH_VALUE`, `FIRST_VALUE` and `LAST_VALUE`.

### Does this PR introduce _any_ user-facing change?
 'No'.

### How was this patch tested?
Jenkins test.

Closes #29800 from beliefer/optimize-nth_value.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: beliefer <beliefer@163.com>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-28 06:40:23 +00:00
allisonwang-db 9fb45361fd [SPARK-33183][SQL] Fix Optimizer rule EliminateSorts and add a physical rule to remove redundant sorts
### What changes were proposed in this pull request?
This PR aims to fix a correctness bug in the optimizer rule `EliminateSorts`. It also adds a new physical rule to remove redundant sorts that cannot be eliminated in the Optimizer rule after the bugfix.

### Why are the changes needed?
A global sort should not be eliminated even if its child is ordered since we don't know if its child ordering is global or local. For example, in the following scenario, the first sort shouldn't be removed because it has a stronger guarantee than the second sort even if the sort orders are the same for both sorts.

```
Sort(orders, global = True, ...)
  Sort(orders, global = False, ...)
```

Since there is no straightforward way to identify whether a node's output ordering is local or global, we should not remove a global sort even if its child is already ordered.

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
Unit tests

Closes #30093 from allisonwang-db/fix-sort.

Authored-by: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-28 05:51:47 +00:00
Terry Kim 528160f001 [SPARK-33174][SQL] Migrate DROP TABLE to use UnresolvedTableOrView to resolve the identifier
### What changes were proposed in this pull request?

This PR proposes to migrate `DROP TABLE` to use `UnresolvedTableOrView` to resolve the table/view identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing).

### Why are the changes needed?

The current behavior is not consistent between v1 and v2 commands when resolving a temp view.
In v2, the `t` in the following example is resolved to a table:
```scala
sql("CREATE TABLE testcat.ns.t (id bigint) USING foo")
sql("CREATE TEMPORARY VIEW t AS SELECT 2")
sql("USE testcat.ns")
sql("DROP TABLE t") // 't' is resolved to testcat.ns.t
```
whereas in v1, the `t` is resolved to a temp view:
```scala
sql("CREATE DATABASE test")
sql("CREATE TABLE spark_catalog.test.t (id bigint) USING csv")
sql("CREATE TEMPORARY VIEW t AS SELECT 2")
sql("USE spark_catalog.test")
sql("DROP TABLE t") // 't' is resolved to a temp view
```

### Does this PR introduce _any_ user-facing change?

After this PR, for v2, `DROP TABLE t` is resolved to a temp view `t` instead of `testcat.ns.t`, consistent with v1 behavior.

### How was this patch tested?

Added a new test

Closes #30079 from imback82/drop_table_consistent.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-28 05:44:55 +00:00
Jungtaek Lim (HeartSaVioR) fcf8aa59b5 [SPARK-33240][SQL] Fail fast when fails to instantiate configured v2 session catalog
### What changes were proposed in this pull request?

This patch proposes to change the behavior on failing fast when Spark fails to instantiate configured v2 session catalog.

### Why are the changes needed?

The Spark behavior is against the intention of the end users - if end users configure session catalog which Spark would fail to initialize, Spark would swallow the error with only logging the error message and silently use the default catalog implementation.

This follows the voices on [discussion thread](https://lists.apache.org/thread.html/rdfa22a5ebdc4ac66e2c5c8ff0cd9d750e8a1690cd6fb456d119c2400%40%3Cdev.spark.apache.org%3E) in dev mailing list.

### Does this PR introduce _any_ user-facing change?

Yes. After the PR Spark will fail immediately if Spark fails to instantiate configured session catalog.

### How was this patch tested?

New UT added.

Closes #30147 from HeartSaVioR/SPARK-33240.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-28 03:31:11 +00:00
Ankur Dave 3f2a2b5fe6 [SPARK-33260][SQL] Fix incorrect results from SortExec when sortOrder is Stream
### What changes were proposed in this pull request?

The following query produces incorrect results. The query has two essential features: (1) it contains a string aggregate, resulting in a `SortExec` node, and (2) it contains a duplicate grouping key, causing `RemoveRepetitionFromGroupExpressions` to produce a sort order stored as a `Stream`.

```sql
SELECT bigint_col_1, bigint_col_9, MAX(CAST(bigint_col_1 AS string))
FROM table_4
GROUP BY bigint_col_1, bigint_col_9, bigint_col_9
```

When the sort order is stored as a `Stream`, the line `ordering.map(_.child.genCode(ctx))` in `GenerateOrdering#createOrderKeys()` produces unpredictable side effects to `ctx`. This is because `genCode(ctx)` modifies `ctx`. When ordering is a `Stream`, the modifications will not happen immediately as intended, but will instead occur lazily when the returned `Stream` is used later.

Similar bugs have occurred at least three times in the past: https://issues.apache.org/jira/browse/SPARK-24500, https://issues.apache.org/jira/browse/SPARK-25767, https://issues.apache.org/jira/browse/SPARK-26680.

The fix is to check if `ordering` is a `Stream` and force the modifications to happen immediately if so.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added a unit test for `SortExec` where `sortOrder` is a `Stream`. The test previously failed and now passes.

Closes #30160 from ankurdave/SPARK-33260.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-27 13:20:22 -07:00
Huaxin Gao f284218dae [SPARK-33137][SQL] Support ALTER TABLE in JDBC v2 Table Catalog: update type and nullability of columns (Postgres dialect)
### What changes were proposed in this pull request?
Override the default SQL strings in Postgres Dialect for:

- ALTER TABLE UPDATE COLUMN TYPE
- ALTER TABLE UPDATE COLUMN NULLABILITY

Add new docker integration test suite `jdbc/v2/PostgreSQLIntegrationSuite.scala`

### Why are the changes needed?
supports Postgres specific ALTER TABLE syntax.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add new test `PostgreSQLIntegrationSuite`

Closes #30089 from huaxingao/postgres_docker.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-27 15:04:53 +00:00
xuewei.linxuewei 537a49fc09 [SPARK-33140][SQL] remove SQLConf and SparkSession in all sub-class of Rule[QueryPlan]
### What changes were proposed in this pull request?

Since Issue [SPARK-33139](https://issues.apache.org/jira/browse/SPARK-33139) has been done, and SQLConf.get and SparkSession.active are more reliable. We are trying to refine the existing code usage of passing SQLConf and SparkSession into sub-class of Rule[QueryPlan].

In this PR.

* remove SQLConf from ctor-parameter of all sub-class of Rule[QueryPlan].
* using SQLConf.get to replace the original SQLConf instance.
* remove SparkSession from ctor-parameter of all sub-class of Rule[QueryPlan].
* using SparkSession.active to replace the original SparkSession instance.

### Why are the changes needed?

Code refine.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?

Existing test

Closes #30097 from leanken/leanken-SPARK-33140.

Authored-by: xuewei.linxuewei <xuewei.linxuewei@alibaba-inc.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-27 12:40:57 +00:00
angerszhu e43cd8ccef [SPARK-32388][SQL] TRANSFORM with schema-less mode should keep the same with hive
### What changes were proposed in this pull request?
In current Spark script transformation with hive serde mode, in case of schema less, result is different with hive.
This pr to keep result same with hive script transform  serde.

#### Hive Scrip Transform with serde in schemaless
```
hive> create table t (c0 int, c1 int, c2 int);
hive> INSERT INTO t VALUES (1, 1, 1);
hive> INSERT INTO t VALUES (2, 2, 2);
hive> CREATE VIEW v AS SELECT TRANSFORM(c0, c1, c2) USING 'cat' FROM t;

hive> DESCRIBE v;
key                 	string
value               	string

hive> SELECT * FROM v;
1	1	1
2	2	2

hive> SELECT key FROM v;
1
2

hive> SELECT value FROM v;
1	1
2	2
```

#### Spark script transform with hive serde in schema less.
```
hive> create table t (c0 int, c1 int, c2 int);
hive> INSERT INTO t VALUES (1, 1, 1);
hive> INSERT INTO t VALUES (2, 2, 2);
hive> CREATE VIEW v AS SELECT TRANSFORM(c0, c1, c2) USING 'cat' FROM t;

hive> SELECT * FROM v;
1   1
2   2
```

**No serde mode in hive (ROW FORMATTED DELIMITED)**
![image](https://user-images.githubusercontent.com/46485123/90088770-55841e00-dd52-11ea-92dd-7fe52d93f0b3.png)

### Why are the changes needed?
Keep same behavior with hive script transform

### Does this PR introduce _any_ user-facing change?
Before this pr with hive serde script transform
```
select transform(*)
USING 'cat'
from (
select 1, 2, 3, 4
) tmp

key     value
1         2
```
After
```
select transform(*)
USING 'cat'
from (
select 1, 2, 3, 4
) tmp

key     value
1         2   3  4
```
### How was this patch tested?
UT

Closes #29421 from AngersZhuuuu/SPARK-32388.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-27 09:25:53 +09:00
Steve Loughran 02fa19f102 [SPARK-33230][SQL] Hadoop committers to get unique job ID in "spark.sql.sources.writeJobUUID"
### What changes were proposed in this pull request?

This reinstates the old option `spark.sql.sources.write.jobUUID` to set a unique jobId in the jobconf so that hadoop MR committers have a unique ID which is (a) consistent across tasks and workers and (b) not brittle compared to generated-timestamp job IDs. The latter matches that of what JobID requires, but as they are generated per-thread, may not always be unique within a cluster.

### Why are the changes needed?

If a committer (e.g s3a staging committer) uses job-attempt-ID as a unique ID then any two jobs started within the same second have the same ID, so can clash.

### Does this PR introduce _any_ user-facing change?

Good Q. It is "developer-facing" in the context of anyone writing a committer. But it reinstates a property which was in Spark 1.x and "went away"

### How was this patch tested?

Testing: no test here. You'd have to create a new committer which extracted the value in both job and task(s) and verified consistency. That is possible (with a task output whose records contained the UUID), but it would be pretty convoluted and a high maintenance cost.

Because it's trying to address a race condition, it's hard to regenerate the problem downstream and so verify a fix in a test run...I'll just look at the logs to see what temporary dir is being used in the cluster FS and verify it's a UUID

Closes #30141 from steveloughran/SPARK-33230-jobId.

Authored-by: Steve Loughran <stevel@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-26 12:31:05 -07:00
Cheng Su 1042d49bf9 [SPARK-33075][SQL] Enable auto bucketed scan by default (disable only for cached query)
### What changes were proposed in this pull request?

This PR is to enable auto bucketed table scan by default, with exception to only disable for cached query (similar to AQE). The reason why disabling auto scan for cached query is that, the cached query output partitioning can be leveraged later to avoid shuffle and sort when doing join and aggregate.

### Why are the changes needed?

Enable auto bucketed table scan by default is useful as it can optimize query automatically under the hood, without users interaction.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit test for cached query in `DisableUnnecessaryBucketedScanSuite.scala`. Also change a bunch of unit tests which should disable auto bucketed scan to make them work.

Closes #30138 from c21/enable-auto-bucket.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-10-26 20:23:24 +09:00
Cheng Su d87a0bb2ca [SPARK-32862][SS] Left semi stream-stream join
### What changes were proposed in this pull request?

This is to support left semi join in stream-stream join. The implementation of left semi join is (mostly in `StreamingSymmetricHashJoinExec` and `SymmetricHashJoinStateManager`):
* For left side input row, check if there's a match on right side state store.
  * if there's a match, output the left side row, but do not put the row in left side state store (no need to put in state store).
  * if there's no match, output nothing, but put the row in left side state store (with "matched" field to set to false in state store).
* For right side input row, check if there's a match on left side state store.
  * For all matched left rows in state store, output the rows with "matched" field as false. Set all left rows with "matched" field to be true. Only output the left side rows matched for the first time to guarantee left semi join semantics.
* State store eviction: evict rows from left/right side state store below watermark, same as inner join.

Note a followup optimization can be to evict matched left side rows from state store earlier, even when the rows are still above watermark. However this needs more change in `SymmetricHashJoinStateManager`, so will leave this as a followup.

### Why are the changes needed?

Current stream-stream join supports inner, left outer and right outer join (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L166 ). We do see internally a lot of users are using left semi stream-stream join (not spark structured streaming), e.g. I want to get the ad impression (join left side) which has click (joint right side), but I don't care how many clicks per ad (left semi semantics).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit tests in `UnsupportedOperationChecker.scala` and `StreamingJoinSuite.scala`.

Closes #30076 from c21/stream-join.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
2020-10-26 13:33:06 +09:00
HyukjinKwon 369cc614f3 Revert "[SPARK-32388][SQL] TRANSFORM with schema-less mode should keep the same with hive"
This reverts commit 56ab60fb7a.
2020-10-26 11:38:48 +09:00
angerszhu 56ab60fb7a [SPARK-32388][SQL] TRANSFORM with schema-less mode should keep the same with hive
### What changes were proposed in this pull request?
In current Spark script transformation with hive serde mode, in case of schema less, result is different with hive.
This pr to keep result same with hive script transform  serde.

#### Hive Scrip Transform with serde in schemaless
```
hive> create table t (c0 int, c1 int, c2 int);
hive> INSERT INTO t VALUES (1, 1, 1);
hive> INSERT INTO t VALUES (2, 2, 2);
hive> CREATE VIEW v AS SELECT TRANSFORM(c0, c1, c2) USING 'cat' FROM t;

hive> DESCRIBE v;
key                 	string
value               	string

hive> SELECT * FROM v;
1	1	1
2	2	2

hive> SELECT key FROM v;
1
2

hive> SELECT value FROM v;
1	1
2	2
```

#### Spark script transform with hive serde in schema less.
```
hive> create table t (c0 int, c1 int, c2 int);
hive> INSERT INTO t VALUES (1, 1, 1);
hive> INSERT INTO t VALUES (2, 2, 2);
hive> CREATE VIEW v AS SELECT TRANSFORM(c0, c1, c2) USING 'cat' FROM t;

hive> SELECT * FROM v;
1   1
2   2
```

**No serde mode in hive (ROW FORMATTED DELIMITED)**
![image](https://user-images.githubusercontent.com/46485123/90088770-55841e00-dd52-11ea-92dd-7fe52d93f0b3.png)

### Why are the changes needed?
Keep same behavior with hive script transform

### Does this PR introduce _any_ user-facing change?
Before this pr with hive serde script transform
```
select transform(*)
USING 'cat'
from (
select 1, 2, 3, 4
) tmp

key     value
1         2
```
After
```
select transform(*)
USING 'cat'
from (
select 1, 2, 3, 4
) tmp

key     value
1         2   3  4
```
### How was this patch tested?
UT

Closes #29421 from AngersZhuuuu/SPARK-32388.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-26 11:20:29 +09:00
Takeshi Yamamuro 87b498462b [SPARK-33228][SQL] Don't uncache data when replacing a view having the same logical plan
### What changes were proposed in this pull request?

SPARK-30494's updated the `CreateViewCommand` code to implicitly drop cache when replacing an existing view. But, this change drops cache even when replacing a view having the same logical plan. A sequence of queries to reproduce this as follows;
```
// Spark v2.4.6+
scala> val df = spark.range(1).selectExpr("id a", "id b")
scala> df.cache()
scala> df.explain()
== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [a#2L, b#3L]
      +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
               +- *(1) Range (0, 1, step=1, splits=4)

scala> df.createOrReplaceTempView("t")
scala> sql("select * from t").explain()
== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [a#2L, b#3L]
      +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
               +- *(1) Range (0, 1, step=1, splits=4)

// If one re-runs the same query `df.createOrReplaceTempView("t")`, the cache's swept away
scala> df.createOrReplaceTempView("t")
scala> sql("select * from t").explain()
== Physical Plan ==
*(1) Project [id#0L AS a#2L, id#0L AS b#3L]
+- *(1) Range (0, 1, step=1, splits=4)

// Until v2.4.6
scala> val df = spark.range(1).selectExpr("id a", "id b")
scala> df.cache()
scala> df.createOrReplaceTempView("t")
scala> sql("select * from t").explain()
20/10/23 22:33:42 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
== Physical Plan ==
*(1) InMemoryTableScan [a#2L, b#3L]
   +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
            +- *(1) Range (0, 1, step=1, splits=4)

scala> df.createOrReplaceTempView("t")
scala> sql("select * from t").explain()
== Physical Plan ==
*(1) InMemoryTableScan [a#2L, b#3L]
   +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
            +- *(1) Range (0, 1, step=1, splits=4)
```

### Why are the changes needed?

bugfix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added tests.

Closes #30140 from maropu/FixBugInReplaceView.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-25 16:15:55 -07:00
Jungtaek Lim (HeartSaVioR) 0c66a88d1d [SPARK-29438][SS][FOLLOWUP] Add regression tests for Streaming Aggregation and flatMapGroupsWithState
### What changes were proposed in this pull request?

This patch adds new UTs to prevent SPARK-29438 for streaming aggregation as well as flatMapGroupsWithState, as we agree about the review comment quote here:

https://github.com/apache/spark/pull/26162#issuecomment-576929692

> LGTM for this PR. But on a additional note, this is a very subtle and easy-to-make bug with TaskContext.getPartitionId. I wonder if this bug is present in any other stateful operation. I wonder if this bug is present in any other stateful operation. Can you please verify how partitionId is used in the other stateful operations?

For now they're not broken, but even better if we have UTs to prevent the case for the future.

### Why are the changes needed?

New UTs will prevent streaming aggregation and flatMapGroupsWithState to be broken in future where it is placed on the right side of UNION and the number of partition is changing on the left side of UNION. Please refer SPARK-29438 for more details.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added UTs.

Closes #27333 from HeartSaVioR/SPARK-29438-add-regression-test.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2020-10-24 15:36:41 -07:00
Kent Yao e21bb710e5 [SPARK-32991][SQL] Use conf in shared state as the original configuraion for RESET
### What changes were proposed in this pull request?

####  case

the case here covers the static and dynamic SQL configs behavior in `sharedState` and `sessionState`,  and the specially handled config `spark.sql.warehouse.dir`
the case can be found here - https://github.com/yaooqinn/sugar/blob/master/src/main/scala/com/netease/mammut/spark/training/sql/WarehouseSCBeforeSS.scala

```scala

import java.lang.reflect.Field

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object WarehouseSCBeforeSS extends App {

  val wh = "spark.sql.warehouse.dir"
  val td = "spark.sql.globalTempDatabase"
  val custom = "spark.sql.custom"

  val conf = new SparkConf()
    .setMaster("local")
    .setAppName("SPARK-32991")
    .set(wh, "./data1")
    .set(td, "bob")

  val sc = new SparkContext(conf)

  val spark = SparkSession.builder()
    .config(wh, "./data2")
    .config(td, "alice")
    .config(custom, "kyao")
    .getOrCreate()

  val confField: Field = spark.sharedState.getClass.getDeclaredField("conf")
  confField.setAccessible(true)
  private val shared: SparkConf = confField.get(spark.sharedState).asInstanceOf[SparkConf]
  println()
  println(s"=====> SharedState: $wh=${shared.get(wh)}")
  println(s"=====> SharedState: $td=${shared.get(td)}")
  println(s"=====> SharedState: $custom=${shared.get(custom, "")}")

  println(s"=====> SessionState: $wh=${spark.conf.get(wh)}")
  println(s"=====> SessionState: $td=${spark.conf.get(td)}")
  println(s"=====> SessionState: $custom=${spark.conf.get(custom, "")}")

  val spark2 = SparkSession.builder().config(td, "fred").getOrCreate()

  println(s"=====> SessionState 2: $wh=${spark2.conf.get(wh)}")
  println(s"=====> SessionState 2: $td=${spark2.conf.get(td)}")
  println(s"=====> SessionState 2: $custom=${spark2.conf.get(custom, "")}")

  SparkSession.setActiveSession(spark)
  spark.sql("RESET")

  println(s"=====> SessionState RESET: $wh=${spark.conf.get(wh)}")
  println(s"=====> SessionState RESET: $td=${spark.conf.get(td)}")
  println(s"=====> SessionState RESET: $custom=${spark.conf.get(custom, "")}")

  val spark3 = SparkSession.builder().getOrCreate()

  println(s"=====> SessionState 3: $wh=${spark2.conf.get(wh)}")
  println(s"=====> SessionState 3: $td=${spark2.conf.get(td)}")
  println(s"=====> SessionState 3: $custom=${spark2.conf.get(custom, "")}")
}
```

#### outputs and analysis
```
// 1. Make the cloned spark conf in shared state respect the warehouse dir from the 1st SparkSession
//=====> SharedState: spark.sql.warehouse.dir=./data1
// 2. 
//=====> SharedState: spark.sql.globalTempDatabase=alice
//=====> SharedState: spark.sql.custom=kyao
//=====> SessionState: spark.sql.warehouse.dir=./data2
//=====> SessionState: spark.sql.globalTempDatabase=alice
//=====> SessionState: spark.sql.custom=kyao
//=====> SessionState 2: spark.sql.warehouse.dir=./data2
//=====> SessionState 2: spark.sql.globalTempDatabase=alice
//=====> SessionState 2: spark.sql.custom=kyao
// 2'.🔼 OK until here
// 3. Make the below 3 ones respect the cloned spark conf in shared state with issue 1 fixed
//=====> SessionState RESET: spark.sql.warehouse.dir=./data1
//=====> SessionState RESET: spark.sql.globalTempDatabase=bob
//=====> SessionState RESET: spark.sql.custom=
// 4. Then the SparkSessions created after RESET will be corrected.
//=====> SessionState 3: spark.sql.warehouse.dir=./data1
//=====> SessionState 3: spark.sql.globalTempDatabase=bob
//=====> SessionState 3: spark.sql.custom=
```

In this PR, we gather all valid config to the cloned conf of `sharedState` during being constructed, well, actually only `spark.sql.warehouse.dir` is missing. Then we use this conf as defaults for `RESET` Command.

`SparkSession.clearActiveSession/clearDefaultSession` will make the shared state invisible and unsharable. They will be internal only soon (confirmed with Wenchen), so cases with them called will not be a problem.

### Why are the changes needed?

bugfix for programming API to call RESET while users creating SparkContext first and config SparkSession later.

### Does this PR introduce _any_ user-facing change?

yes, before this change when you use programming API and call RESET, all configs will be reset to  SparkContext.conf, now they go to SparkSession.sharedState.conf

### How was this patch tested?

new tests

Closes #30045 from yaooqinn/SPARK-32991.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-23 05:52:38 +00:00
Max Gekk a03d77d326 [SPARK-33160][SQL][FOLLOWUP] Replace the parquet metadata key org.apache.spark.int96NoRebase by org.apache.spark.legacyINT96
### What changes were proposed in this pull request?
1. Replace the metadata key `org.apache.spark.int96NoRebase` by `org.apache.spark.legacyINT96`.
2. Change the condition when new key should be saved to parquet metadata: it should be saved when the SQL config `spark.sql.legacy.parquet.int96RebaseModeInWrite` is set to `LEGACY`.
3. Change handling the metadata key in read:
    - If there is no the key in parquet metadata, take the rebase mode from the SQL config: `spark.sql.legacy.parquet.int96RebaseModeInRead`
    - If parquet files were saved by Spark < 3.1.0, use the `LEGACY` rebasing mode for INT96 type.
    - For files written by Spark >= 3.1.0, if the `org.apache.spark.legacyINT96` presents in metadata, perform rebasing otherwise don't.

### Why are the changes needed?
- To not increase parquet size by default when `spark.sql.legacy.parquet.int96RebaseModeInWrite` is `EXCEPTION` after https://github.com/apache/spark/pull/30121.
- To have the implementation similar to `org.apache.spark.legacyDateTime`
- To minimise impact on other subsystems that are based on file sizes like gathering statistics.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Modified test in `ParquetIOSuite`

Closes #30132 from MaxGekk/int96-flip-metadata-rebase-key.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-22 15:57:03 +00:00
yangjie01 b38f3a5557 [SPARK-32978][SQL] Make sure the number of dynamic part metric is correct
### What changes were proposed in this pull request?

The purpose of this pr is to resolve SPARK-32978.

The main reason of bad case describe in SPARK-32978 is the `BasicWriteTaskStatsTracker` directly reports the new added partition number of each task, which makes it impossible to remove duplicate data in driver side.

The main of this pr is change to report partitionValues to driver and remove duplicate data at driver side to make sure the number of dynamic part metric is correct.

### Why are the changes needed?
The the number of dynamic part metric we display on the UI should be correct.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add a new test case refer to described in SPARK-32978

Closes #30026 from LuciferYang/SPARK-32978.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-22 14:01:07 +00:00
Prashant Sharma 8cae7f88b0 [SPARK-33095][SQL] Support ALTER TABLE in JDBC v2 Table Catalog: add, update type and nullability of columns (MySQL dialect)
### What changes were proposed in this pull request?

Override the default SQL strings for:
ALTER TABLE UPDATE COLUMN TYPE
ALTER TABLE UPDATE COLUMN NULLABILITY
in the following MySQL JDBC dialect according to official documentation.
Write MySQL integration tests for JDBC.

### Why are the changes needed?
Improved code coverage and support mysql dialect for jdbc.

### Does this PR introduce _any_ user-facing change?

Yes, Support ALTER TABLE in JDBC v2 Table Catalog: add, update type and nullability of columns (MySQL dialect)

### How was this patch tested?

Added tests.

Closes #30025 from ScrapCodes/mysql-dialect.

Authored-by: Prashant Sharma <prashsh1@in.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-22 13:51:42 +00:00
gengjiaan eb33bcb4b2 [SPARK-30796][SQL] Add parameter position for REGEXP_REPLACE
### What changes were proposed in this pull request?
`REGEXP_REPLACE` could replace all substrings of string that match regexp with replacement string.
But `REGEXP_REPLACE` lost some flexibility. such as: converts camel case strings to a string containing lower case words separated by an underscore:
AddressLine1 -> address_line_1
If we support the parameter position, we can do like this(e.g. Oracle):

```
WITH strings as (
  SELECT 'AddressLine1' s FROM dual union all
  SELECT 'ZipCode' s FROM dual union all
  SELECT 'Country' s FROM dual
)
  SELECT s "STRING",
         lower(regexp_replace(s, '([A-Z0-9])', '_\1', 2)) "MODIFIED_STRING"
  FROM strings;
```
The output:
```
  STRING               MODIFIED_STRING
-------------------- --------------------
AddressLine1         address_line_1
ZipCode              zip_code
Country              country
```

There are some mainstream database support the syntax.

**Oracle**
https://docs.oracle.com/en/database/oracle/oracle-database/19/sqlrf/REGEXP_REPLACE.html#GUID-EA80A33C-441A-4692-A959-273B5A224490

**Vertica**
https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Functions/RegularExpressions/REGEXP_REPLACE.htm?zoom_highlight=regexp_replace

**Redshift**
https://docs.aws.amazon.com/redshift/latest/dg/REGEXP_REPLACE.html

### Why are the changes needed?
The parameter position for `REGEXP_REPLACE` is very useful.

### Does this PR introduce _any_ user-facing change?
'Yes'.

### How was this patch tested?
Jenkins test.

Closes #29891 from beliefer/add-position-for-regex_replace.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: beliefer <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-22 07:59:49 +00:00
Max Gekk ba13b94f6b [SPARK-33210][SQL] Set the rebasing mode for parquet INT96 type to EXCEPTION by default
### What changes were proposed in this pull request?
1. Set the default value for the SQL configs `spark.sql.legacy.parquet.int96RebaseModeInWrite` and `spark.sql.legacy.parquet.int96RebaseModeInRead` to `EXCEPTION`.
2. Update the SQL migration guide.

### Why are the changes needed?
Current default value `LEGACY` may lead to shifting timestamps in read or in write. We should leave the decision about rebasing to users.

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
By existing test suites like `ParquetIOSuite`.

Closes #30121 from MaxGekk/int96-exception-by-default.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-22 03:04:29 +00:00
Max Gekk bbf2d6f6df [SPARK-33160][SQL][FOLLOWUP] Update benchmarks of INT96 type rebasing
### What changes were proposed in this pull request?
1. Turn off/on the SQL config `spark.sql.legacy.parquet.int96RebaseModeInWrite` which was added by https://github.com/apache/spark/pull/30056 in `DateTimeRebaseBenchmark`. The parquet readers should infer correct rebasing mode automatically from metadata.
2. Regenerate benchmark results of `DateTimeRebaseBenchmark` in the environment:

| Item | Description |
| ---- | ----|
| Region | us-west-2 (Oregon) |
| Instance | r3.xlarge (spot instance) |
| AMI | ami-06f2f779464715dc5 (ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1) |
| Java | OpenJDK8/11 installed by`sudo add-apt-repository ppa:openjdk-r/ppa` & `sudo apt install openjdk-11-jdk`|

### Why are the changes needed?
To have up-to-date info about INT96 performance which is the default type for Catalyst's timestamp type.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By updating benchmark results:
```
$ SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.DateTimeRebaseBenchmark"
```

Closes #30118 from MaxGekk/int96-rebase-benchmark.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-22 10:03:41 +09:00
Gabor Somogyi fbb6843620 [SPARK-32229][SQL] Fix PostgresConnectionProvider and MSSQLConnectionProvider by accessing wrapped driver
### What changes were proposed in this pull request?
Postgres and MSSQL connection providers are not able to get custom `appEntry` because under some circumstances the driver is wrapped with `DriverWrapper`. Such case is not handled in the mentioned providers. In this PR I've added this edge case handling by passing unwrapped `Driver` from `JdbcUtils`.

### Why are the changes needed?
`DriverWrapper` is not considered.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing + additional unit tests.

Closes #30024 from gaborgsomogyi/SPARK-32229.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-10-20 15:14:38 +09:00
Max Gekk a44e008de3 [SPARK-33160][SQL] Allow saving/loading INT96 in parquet w/o rebasing
### What changes were proposed in this pull request?
1. Add the SQL config `spark.sql.legacy.parquet.int96RebaseModeInWrite` to control timestamps rebasing in saving them as INT96. It supports the same set of values as `spark.sql.legacy.parquet.datetimeRebaseModeInWrite` but the default value is `LEGACY` to preserve backward compatibility with Spark <= 3.0.
2. Write the metadata key `org.apache.spark.int96NoRebase` to parquet files if the files are saved with `spark.sql.legacy.parquet.int96RebaseModeInWrite` isn't set to `LEGACY`.
3. Add the SQL config `spark.sql.legacy.parquet.datetimeRebaseModeInRead` to control loading INT96 timestamps when parquet metadata doesn't have enough info (the `org.apache.spark.int96NoRebase` tag) about parquet writer - either INT96 was written by Proleptic Gregorian system or some Julian one.
4. Modified Vectorized and Parquet-mr Readers to support loading/saving INT96 timestamps w/o rebasing depending on SQL config and the metadata tag:
    - **No rebasing** in testing when the SQL config `spark.test.forceNoRebase` is set to `true`
    - **No rebasing** if parquet metadata contains the tag `org.apache.spark.int96NoRebase`. This is the case when parquet files are saved by Spark >= 3.1 with `spark.sql.legacy.parquet.datetimeRebaseModeInWrite` is set to `CORRECTED`, or saved by other systems with the tag `org.apache.spark.int96NoRebase`.
    - **With rebasing** if parquet files saved by Spark (any versions) without the metadata tag `org.apache.spark.int96NoRebase`.
    - Rebasing depend on the SQL config `spark.sql.legacy.parquet.datetimeRebaseModeInRead` if there are no metadata tags `org.apache.spark.version` and `org.apache.spark.int96NoRebase`.

New SQL configs are added instead of re-using existing `spark.sql.legacy.parquet.datetimeRebaseModeInWrite` and `spark.sql.legacy.parquet.datetimeRebaseModeInRead` because of:
- To allow users have different modes for INT96 and for TIMESTAMP_MICROS (MILLIS). For example, users might want to save INT96 as LEGACY but TIMESTAMP_MICROS as CORRECTED.
- To have different modes for INT96 and DATE in load (or in save).
- To be backward compatible with Spark 2.4. For now, `spark.sql.legacy.parquet.datetimeRebaseModeInWrite/Read` are set to `EXCEPTION` by default.

### Why are the changes needed?
1. Parquet spec says that INT96 must be stored as Julian days (see https://github.com/apache/parquet-format/pull/49). This doesn't mean that a reader ( or a writer) is based on the Julian calendar. So, rebasing from Proleptic Gregorian to Julian calendar can be not needed.
2. Rebasing from/to Julian calendar can loose information because dates in one calendar don't exist in another one. Like 1582-10-04..1582-10-15 exist in Proleptic Gregorian calendar but not in the hybrid calendar (Julian + Gregorian), and visa versa, Julian date 1000-02-29 doesn't exist in Proleptic Gregorian calendar. We should allow users to save timestamps without loosing such dates (rebasing shifts such dates to the next valid date).
3. It would also make Spark compatible with other systems such as Impala and newer versions of Hive that write proleptic Gregorian based INT96 timestamps.

### Does this PR introduce _any_ user-facing change?
It can when `spark.sql.legacy.parquet.int96RebaseModeInWrite` is set non-default value `LEGACY`.

### How was this patch tested?
- Added a test to check the metadata key `org.apache.spark.int96NoRebase`
- By `ParquetIOSuite`

Closes #30056 from MaxGekk/parquet-rebase-int96.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-20 14:58:59 +09:00
Nan Zhu 35133901f7 [SPARK-32351][SQL] Show partially pushed down partition filters in explain()
### What changes were proposed in this pull request?

Currently, actual non-dynamic partition pruning is executed in the optimizer phase (PruneFileSourcePartitions) if an input relation has a catalog file index. The current code assumes the same partition filters are generated again in FileSourceStrategy and passed into FileSourceScanExec. FileSourceScanExec uses the partition filters when listing files, but these non-dynamic partition filters do nothing because unnecessary partitions are already pruned in advance, so the filters are mainly used for explain output in this case. If a WHERE clause has DNF-ed predicates, FileSourceStrategy cannot extract the same filters with PruneFileSourcePartitions and then PartitionFilters is not shown in explain output.

This patch proposes to extract partition filters in FileSourceStrategy and HiveStrategy with `extractPredicatesWithinOutputSet` added in https://github.com/apache/spark/pull/29101/files#diff-6be42cfa3c62a7536b1eb1d6447c073c again, then It will show the partially pushed down partition filter in explain().

### Why are the changes needed?

without the patch, the explained plan is inconsistent with what is actually executed

<b>without the change </b> the explained plan of `"SELECT * FROM t WHERE p = '1' OR (p = '2' AND i = 1)"` for datasource and hive tables are like the following respectively (missing pushed down partition filters)

```
== Physical Plan ==
*(1) Filter ((p#21 = 1) OR ((p#21 = 2) AND (i#20 = 1)))
+- *(1) ColumnarToRow
   +- FileScan parquet default.t[i#20,p#21] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/nanzhu/code/spark/sql/hive/target/tmp/hive_execution_test_group/war..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i:int>
```

```
   == Physical Plan ==
   *(1) Filter ((p#33 = 1) OR ((p#33 = 2) AND (i#32 = 1)))
   +- Scan hive default.t [i#32, p#33], HiveTableRelation [`default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [i#32], Partition Cols: [p#33], Pruned Partitions: [(p=1), (p=2)]]
```

<b> with change </b> the  plan looks like (the actually executed partition filters are exhibited)

```
== Physical Plan ==
*(1) Filter ((p#21 = 1) OR ((p#21 = 2) AND (i#20 = 1)))
+- *(1) ColumnarToRow
   +- FileScan parquet default.t[i#20,p#21] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/nanzhu/code/spark/sql/hive/target/tmp/hive_execution_test_group/war..., PartitionFilters: [((p#21 = 1) OR (p#21 = 2))], PushedFilters: [], ReadSchema: struct<i:int>
```

```
== Physical Plan ==
*(1) Filter ((p#37 = 1) OR ((p#37 = 2) AND (i#36 = 1)))
+- Scan hive default.t [i#36, p#37], HiveTableRelation [`default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [i#36], Partition Cols: [p#37], Pruned Partitions: [(p=1), (p=2)]], [((p#37 = 1) OR (p#37 = 2))]
```

### Does this PR introduce _any_ user-facing change

no

### How was this patch tested?
Unit test.

Closes #29831 from CodingCat/SPARK-32351.

Lead-authored-by: Nan Zhu <nanzhu@uber.com>
Co-authored-by: Nan Zhu <CodingCat@users.noreply.github.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-20 11:13:16 +09:00
Max Gekk 26b13c70c3 [SPARK-33169][SQL][TESTS] Check propagation of datasource options to underlying file system for built-in file-based datasources
### What changes were proposed in this pull request?
1. Add the common trait `CommonFileDataSourceSuite` with tests that can be executed for all built-in file-based datasources.
2. Add a test `CommonFileDataSourceSuite` to check that datasource options are propagated to underlying file systems as Hadoop configs.
3. Mix `CommonFileDataSourceSuite` to `AvroSuite`, `OrcSourceSuite`, `TextSuite`, `JsonSuite`, CSVSuite` and to `ParquetFileFormatSuite`.
4. Remove duplicated tests from `AvroSuite` and from `OrcSourceSuite`.

### Why are the changes needed?
To improve test coverage and test all built-in file-based datasources.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the affected test suites.

Closes #30067 from MaxGekk/ds-options-common-test.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-19 17:47:49 +09:00
Liang-Chi Hsieh 3010e9044e [SPARK-33170][SQL] Add SQL config to control fast-fail behavior in FileFormatWriter
### What changes were proposed in this pull request?

This patch proposes to add a config we can control fast-fail behavior in FileFormatWriter and set it false by default.

### Why are the changes needed?

In SPARK-29649, we catch `FileAlreadyExistsException` in `FileFormatWriter` and fail fast for the task set to prevent task retry.

Due to latest discussion, it is important to be able to keep original behavior that is to retry tasks even `FileAlreadyExistsException` is thrown, because `FileAlreadyExistsException` could be recoverable in some cases.

We are going to add a config we can control this behavior and set it false for fast-fail by default.

### Does this PR introduce _any_ user-facing change?

Yes. By default the task in FileFormatWriter will retry even if `FileAlreadyExistsException` is thrown. This is the behavior before Spark 3.0. User can control fast-fail behavior by enabling it.

### How was this patch tested?

Unit test.

Closes #30073 from viirya/SPARK-33170.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-17 21:02:25 -07:00
Liang-Chi Hsieh e574fcd230 [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
### What changes were proposed in this pull request?

SPARK-29358 added support for `unionByName` to work when the two datasets didn't necessarily have the same schema, but it does not work with nested columns like structs. This patch adds the support to work with struct columns.

The behavior before this PR:

```scala
scala> val df1 = spark.range(1).selectExpr("id c0", "named_struct('c', id + 1, 'b', id + 2, 'a', id + 3) c1")
scala> val df2 = spark.range(1).selectExpr("id c0", "named_struct('c', id + 1, 'b', id + 2) c1")
scala> df1.unionByName(df2, true).printSchema
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. struct<c:bigint,b:bigint> <> struct<c:bigint,b:bigint,a:bigint> at the second column of the second table;;
'Union false, false
:- Project [id#0L AS c0#2L, named_struct(c, (id#0L + cast(1 as bigint)), b, (id#0L + cast(2 as bigint)), a, (id#0L + cast(3 as bigint))) AS c1#3]
:  +- Range (0, 1, step=1, splits=Some(12))
+- Project [c0#8L, c1#9]
   +- Project [id#6L AS c0#8L, named_struct(c, (id#6L + cast(1 as bigint)), b, (id#6L + cast(2 as bigint))) AS c1#9]
      +- Range (0, 1, step=1, splits=Some(12))
```

The behavior after this PR:

```scala
scala> df1.unionByName(df2, true).printSchema
root
 |-- c0: long (nullable = false)
 |-- c1: struct (nullable = false)
 |    |-- a: long (nullable = true)
 |    |-- b: long (nullable = false)
 |    |-- c: long (nullable = false)
scala> df1.unionByName(df2, true).show()
+---+-------------+
| c0|           c1|
+---+-------------+
|  0|    {3, 2, 1}|
|  0|{ null, 2, 1}|
+---+-------------+
```

### Why are the changes needed?

The `allowMissingColumns` of `unionByName` is a feature allowing merging different schema from two datasets when unioning them together. Nested column support makes the feature more general and flexible for usage.

### Does this PR introduce _any_ user-facing change?

Yes, after this change users can union two datasets with different schema with different structs.

### How was this patch tested?

Unit tests.

Closes #29587 from viirya/SPARK-32376.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2020-10-16 14:48:14 -07:00
Max Gekk acb79f52db [MINOR][SQL] Re-use binaryToSQLTimestamp() in ParquetRowConverter
### What changes were proposed in this pull request?
The function `binaryToSQLTimestamp()` is used by Parquet Vectorized reader. Parquet MR reader has similar code for de-serialization of INT96 timestamps. In this PR, I propose to de-duplicate code and re-use `binaryToSQLTimestamp()`.

### Why are the changes needed?
This should improve maintenance, and should allow to avoid errors while changing Vectorized and regular parquet readers.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By existing test suites, for instance `ParquetIOSuite`.

Closes #30069 from MaxGekk/int96-common-serde.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-16 14:27:27 -07:00
Dongjoon Hyun ab0bad9544 [SPARK-33171][INFRA] Mark ParquetV*FilterSuite/ParquetV*SchemaPruningSuite as ExtendedSQLTest
### What changes were proposed in this pull request?

This PR aims to mark ParquetV1FilterSuite and ParquetV2FilterSuite as `ExtendedSQLTest`.
- ParquetV1FilterSuite/ParquetV2FilterSuite
- ParquetV1SchemaPruningSuite/ParquetV2SchemaPruningSuite

### Why are the changes needed?

Currently, `sql - other tests` is the longest job. This PR will move the above tests to `sql - slow tests` job.

**BEFORE**
- https://github.com/apache/spark/runs/1264150802 (1 hour 37 minutes)

**AFTER**
- https://github.com/apache/spark/pull/30068/checks?check_run_id=1265879896 (1 hour 21 minutes)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the Github Action with the reduced time.

Closes #30068 from dongjoon-hyun/MOVE3.

Lead-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-16 12:52:45 -07:00
neko e029e891ab [SPARK-33145][WEBUI] Fix when Succeeded Jobs has many child url elements,they will extend over the edge of the page
### What changes were proposed in this pull request?
In Execution web page, when `Succeeded Job`(or Failed Jobs) has many child url elements,they will extend over the edge of the page.

### Why are the changes needed?
To make the page more friendly.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

Munual test result shows  as below:

![fixed](https://user-images.githubusercontent.com/52202080/95977319-50734600-0e4b-11eb-93c0-b8deb565bcd8.png)

Closes #30035 from akiyamaneko/sql_execution_job_overflow.

Authored-by: neko <echohlne@gmail.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
2020-10-16 23:13:22 +08:00
ulysses 3ae1520185 [SPARK-33131][SQL] Fix grouping sets with having clause can not resolve qualified col name
### What changes were proposed in this pull request?

Correct the resolution of having clause.

### Why are the changes needed?

Grouping sets construct new aggregate lost the qualified name of grouping expression. Here is a example:
```
-- Works resolved by `ResolveReferences`
select c1 from values (1) as t1(c1) group by grouping sets(t1.c1) having c1 = 1

-- Works because of the extra expression c1
select c1 as c2 from values (1) as t1(c1) group by grouping sets(t1.c1) having t1.c1 = 1

-- Failed
select c1 from values (1) as t1(c1) group by grouping sets(t1.c1) having t1.c1 = 1
```

It wroks with `Aggregate` without grouping sets through `ResolveReferences`, but Grouping sets not works since the exprId has been changed.

### Does this PR introduce _any_ user-facing change?

Yes, bug fix.

### How was this patch tested?

add test.

Closes #30029 from ulysses-you/SPARK-33131.

Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-16 11:26:27 +00:00
xuewei.linxuewei 306872eefa [SPARK-33139][SQL] protect setActionSession and clearActiveSession
### What changes were proposed in this pull request?

This PR is a sub-task of [SPARK-33138](https://issues.apache.org/jira/browse/SPARK-33138). In order to make SQLConf.get reliable and stable, we need to make sure user can't pollute the SQLConf and SparkSession Context via calling setActiveSession and clearActiveSession.

Change of the PR:

* add legacy config spark.sql.legacy.allowModifyActiveSession to fallback to old behavior if user do need to call these two API.
* by default, if user call these two API, it will throw exception
* add extra two internal and private API setActiveSessionInternal and clearActiveSessionInternal for current internal usage
* change all internal reference to new internal API except for SQLContext.setActive and SQLContext.clearActive

### Why are the changes needed?

Make SQLConf.get reliable and stable.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?

* Add UT in SparkSessionBuilderSuite to test the legacy config
* Existing test

Closes #30042 from leanken/leanken-SPARK-33139.

Authored-by: xuewei.linxuewei <xuewei.linxuewei@alibaba-inc.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-16 06:05:17 +00:00
Takeshi Yamamuro a5c17de241 [SPARK-33165][SQL][TEST] Remove dependencies(scalatest,scalactic) from Benchmark
### What changes were proposed in this pull request?

This PR proposes to remove `assert` from `Benchmark` for making it easier to run benchmark codes via `spark-submit`.

### Why are the changes needed?

Since the current `Benchmark` (`master` and `branch-3.0`) has `assert`, we need to pass the proper jars of `scalatest` and `scalactic`;
 - scalatest-core_2.12-3.2.0.jar
 - scalatest-compatible-3.2.0.jar
 - scalactic_2.12-3.0.jar
```
./bin/spark-submit --jars scalatest-core_2.12-3.2.0.jar,scalatest-compatible-3.2.0.jar,scalactic_2.12-3.0.jar,./sql/catalyst/target/spark-catalyst_2.12-3.1.0-SNAPSHOT-tests.jar,./core/target/spark-core_2.12-3.1.0-SNAPSHOT-tests.jar --class org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark ./sql/core/target/spark-sql_2.12-3.1.0-SNAPSHOT-tests.jar --data-location /tmp/tpcds-sf1
```

This update can make developers submit benchmark codes without these dependencies;
```
./bin/spark-submit --jars ./sql/catalyst/target/spark-catalyst_2.12-3.1.0-SNAPSHOT-tests.jar,./core/target/spark-core_2.12-3.1.0-SNAPSHOT-tests.jar --class org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark ./sql/core/target/spark-sql_2.12-3.1.0-SNAPSHOT-tests.jar --data-location /tmp/tpcds-sf1
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually checked.

Closes #30064 from maropu/RemoveDepInBenchmark.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-16 11:39:09 +09:00
Huaxin Gao bf594a9788 [SPARK-32402][SQL][FOLLOW-UP] Add case sensitivity tests for column resolution in ALTER TABLE
### What changes were proposed in this pull request?
Add case sensitivity tests for column resolution in ALTER TABLE

### Why are the changes needed?
To make sure `spark.sql.caseSensitive` works for `ResolveAlterTableChanges`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
new test

Closes #30063 from huaxingao/caseSensitivity.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-16 11:04:35 +09:00
Max Gekk 38c05af1d5 [SPARK-33163][SQL][TESTS] Check the metadata key 'org.apache.spark.legacyDateTime' in Avro/Parquet files
### What changes were proposed in this pull request?
Added a couple tests to `AvroSuite` and to `ParquetIOSuite` to check that the metadata key 'org.apache.spark.legacyDateTime' is written correctly depending on the SQL configs:
- spark.sql.legacy.avro.datetimeRebaseModeInWrite
- spark.sql.legacy.parquet.datetimeRebaseModeInWrite

This is a follow up https://github.com/apache/spark/pull/28137.

### Why are the changes needed?
1. To improve test coverage
2. To make sure that the metadata key is actually saved to Avro/Parquet files

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the added tests:
```
$ build/sbt "testOnly org.apache.spark.sql.execution.datasources.parquet.ParquetIOSuite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV1Suite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV2Suite"
```

Closes #30061 from MaxGekk/parquet-test-metakey.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-16 10:28:15 +09:00
Denis Pyshev ba69d68d91 [SPARK-33080][BUILD] Replace fatal warnings snippet
### What changes were proposed in this pull request?

Current solution in build file to enable build failure on compilation warnings with exclusion of deprecation ones is not portable after SBT version 1.3.13 (build import fails with compilation error with SBT 1.4) and could be replaced with more robust and maintainable, especially since Scala 2.13.2 with similar built-in functionality.

Additionally, warnings were fixed to pass the build, with as few changes as possible:
warnings in 2.12 compilation fixed in code,
warnings in 2.13 compilation covered by configuration to be addressed separately

### Why are the changes needed?

Unblocks upgrade to SBT after 1.3.13.
Enhances build file maintainability.
Allows fine tune of warnings configuration in scope of Scala 2.13 compilation.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`build/sbt`'s `compile` and `Test/compile` for both Scala 2.12 and 2.13 profiles.

Closes #29995 from gemelen/feature/warnings-reporter.

Authored-by: Denis Pyshev <git@gemelen.net>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-10-15 14:49:43 -05:00
Huaxin Gao 31f7097ce0 [SPARK-32402][SQL][FOLLOW-UP] Use quoted column name for JDBCTableCatalog.alterTable
### What changes were proposed in this pull request?
I currently have unquoted column names in alter table, e.g. ```ALTER TABLE "test"."alt_table" DROP COLUMN c1```
should change to quoted column name ```ALTER TABLE "test"."alt_table" DROP COLUMN "c1"```

### Why are the changes needed?
We should always use quoted identifiers in JDBC SQLs, e.g. ```CREATE TABLE "test"."abc" ("col" INTEGER )  ``` or ```INSERT INTO "test"."abc" ("col") VALUES (?)```. Using unquoted column name in alterTable causes problems, for example:
```
sql("CREATE TABLE h2.test.alt_table (c1 INTEGER, c2 INTEGER) USING _")
sql("ALTER TABLE h2.test.alt_table DROP COLUMN c1")

org.apache.spark.sql.AnalysisException: Failed table altering: test.alt_table;
......

Caused by: org.h2.jdbc.JdbcSQLException: Column "C1" not found; SQL statement:
ALTER TABLE "test"."alt_table" DROP COLUMN c1 [42122-195]

```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests

Closes #30041 from huaxingao/alter_table_followup.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-15 15:33:23 +00:00
manuzhang 77a8efbc05 [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command
### What changes were proposed in this pull request?
Do not use local shuffle reader at final stage if the root node is write command.

### Why are the changes needed?
Users usually repartition with partition column on dynamic partition overwrite. AQE could break it by removing physical shuffle with local shuffle reader. That could lead to a large number of output files, even exceeding the file system limit.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
Add test.

Closes #29797 from manuzhang/spark-32932.

Authored-by: manuzhang <owenzhang1990@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-15 05:53:32 +00:00
Wenchen Fan f3ad32f4b6 [SPARK-33026][SQL][FOLLOWUP] metrics name should be numOutputRows
### What changes were proposed in this pull request?

Follow the convention and rename the metrics `numRows` to `numOutputRows`

### Why are the changes needed?

`FilterExec`, `HashAggregateExec`, etc. all use `numOutputRows`

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #30039 from cloud-fan/minor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-14 16:17:28 +00:00
Jungtaek Lim (HeartSaVioR) 8e5cb1d276 [SPARK-33136][SQL] Fix mistakenly swapped parameter in V2WriteCommand.outputResolved
### What changes were proposed in this pull request?

This PR proposes to fix a bug on calling `DataType.equalsIgnoreCompatibleNullability` with mistakenly swapped parameters in `V2WriteCommand.outputResolved`. The order of parameters for `DataType.equalsIgnoreCompatibleNullability` are `from` and `to`, which says that the right order of matching variables are `inAttr` and `outAttr`.

### Why are the changes needed?

Spark throws AnalysisException due to unresolved operator in v2 write, while the operator is unresolved due to a bug that parameters to call `DataType.equalsIgnoreCompatibleNullability` in `outputResolved` have been swapped.

### Does this PR introduce _any_ user-facing change?

Yes, end users no longer suffer on unresolved operator in v2 write if they're trying to write dataframe containing non-nullable complex types against table matching complex types as nullable.

### How was this patch tested?

New UT added.

Closes #30033 from HeartSaVioR/SPARK-33136.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-14 08:30:03 -07:00
Richard Penney d8c4a47ea1 [SPARK-33061][SQL] Expose inverse hyperbolic trig functions through sql.functions API
This patch is a small extension to change-request SPARK-28133, which added inverse hyperbolic functions to the SQL interpreter, but did not include those methods within the Scala `sql.functions._` API. This patch makes `acosh`, `asinh` and `atanh` functions available through the Scala API.

Unit-tests have been added to `sql/core/src/test/scala/org/apache/spark/sql/MathFunctionsSuite.scala`. Manual testing has been done via `spark-shell`, using the following recipe:
```
val df = spark.range(0, 11)
              .toDF("x")
              .withColumn("x", ($"x" - 5) / 2.0)
val hyps = df.withColumn("tanh", tanh($"x"))
             .withColumn("sinh", sinh($"x"))
             .withColumn("cosh", cosh($"x"))
val invhyps = hyps.withColumn("atanh", atanh($"tanh"))
                  .withColumn("asinh", asinh($"sinh"))
                  .withColumn("acosh", acosh($"cosh"))
invhyps.show
```
which produces the following output:
```
+----+--------------------+-------------------+------------------+-------------------+-------------------+------------------+
|   x|                tanh|               sinh|              cosh|              atanh|              asinh|             acosh|
+----+--------------------+-------------------+------------------+-------------------+-------------------+------------------+
|-2.5| -0.9866142981514303|-6.0502044810397875| 6.132289479663686| -2.500000000000001|-2.4999999999999956|               2.5|
|-2.0| -0.9640275800758169| -3.626860407847019|3.7621956910836314|-2.0000000000000004|-1.9999999999999991|               2.0|
|-1.5| -0.9051482536448664|-2.1292794550948173| 2.352409615243247|-1.4999999999999998|-1.4999999999999998|               1.5|
|-1.0| -0.7615941559557649|-1.1752011936438014| 1.543080634815244|               -1.0|               -1.0|               1.0|
|-0.5|-0.46211715726000974|-0.5210953054937474|1.1276259652063807|               -0.5|-0.5000000000000002|0.4999999999999998|
| 0.0|                 0.0|                0.0|               1.0|                0.0|                0.0|               0.0|
| 0.5| 0.46211715726000974| 0.5210953054937474|1.1276259652063807|                0.5|                0.5|0.4999999999999998|
| 1.0|  0.7615941559557649| 1.1752011936438014| 1.543080634815244|                1.0|                1.0|               1.0|
| 1.5|  0.9051482536448664| 2.1292794550948173| 2.352409615243247| 1.4999999999999998|                1.5|               1.5|
| 2.0|  0.9640275800758169|  3.626860407847019|3.7621956910836314| 2.0000000000000004|                2.0|               2.0|
| 2.5|  0.9866142981514303| 6.0502044810397875| 6.132289479663686|  2.500000000000001|                2.5|               2.5|
+----+--------------------+-------------------+------------------+-------------------+-------------------+------------------+
```

Closes #29938 from rwpenney/fix/inverse-hyperbolics.

Authored-by: Richard Penney <rwp@rwpenney.uk>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-10-14 08:48:55 -05:00
Max Gekk 05a62dcada [SPARK-33134][SQL] Return partial results only for root JSON objects
### What changes were proposed in this pull request?
In the PR, I propose to restrict the partial result feature only by root JSON objects. JSON datasource as well as `from_json()` will return `null` for malformed nested JSON objects.

### Why are the changes needed?
1. To not raise exception to users in the PERMISSIVE mode
2. To fix a regression and to have the same behavior as Spark 2.4.x has
3. Current implementation of partial result is supposed to work only for root (top-level) JSON objects, and not tested for bad nested complex JSON fields.

### Does this PR introduce _any_ user-facing change?
Yes. Before the changes, the code below:
```scala
    val pokerhand_raw = Seq("""[{"cards": [19], "playerId": 123456}]""").toDF("events")
    val event = new StructType().add("playerId", LongType).add("cards", ArrayType(new StructType().add("id", LongType).add("rank", StringType)))
    val pokerhand_events = pokerhand_raw.select(from_json($"events", ArrayType(event)).as("event"))
    pokerhand_events.show
```
throws the exception even in the default **PERMISSIVE** mode:
```java
java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
  at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray(rows.scala:48)
  at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray$(rows.scala:48)
  at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getArray(rows.scala:195)
```

After the changes:
```
+-----+
|event|
+-----+
| null|
+-----+
```

### How was this patch tested?
Added a test to `JsonFunctionsSuite`.

Closes #30031 from MaxGekk/json-skip-row-wrong-schema.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-14 12:13:54 +09:00
Prashant Sharma 304ca1ec93 [SPARK-33129][BUILD][DOCS] Updating the build/sbt references to test-only with testOnly for SBT 1.3.x
### What changes were proposed in this pull request?

test-only - > testOnly in docs across the project.

### Why are the changes needed?

Since the sbt version is updated, the older way or running i.e. `test-only` is no longer valid.

### Does this PR introduce _any_ user-facing change?

docs update.

### How was this patch tested?

Manually.

Closes #30028 from ScrapCodes/fix-build/sbt-sample.

Authored-by: Prashant Sharma <prashsh1@in.ibm.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-13 09:21:06 -07:00
xuewei.linxuewei dc697a8b59 [SPARK-13860][SQL] Change statistical aggregate function to return null instead of Double.NaN when divideByZero
### What changes were proposed in this pull request?

As [SPARK-13860](https://issues.apache.org/jira/browse/SPARK-13860) stated, TPCDS Query 39 returns wrong results using SparkSQL. The root cause is that when stddev_samp is applied to a single element set, with TPCDS answer, it return null; as in SparkSQL, it return Double.NaN which caused the wrong result.

Add an extra legacy config to fallback into the NaN logical, and return null by default to align with TPCDS standard.

### Why are the changes needed?

SQL correctness issue.

### Does this PR introduce any user-facing change?
Yes. See sql-migration-guide

In Spark 3.1, statistical aggregation function includes `std`, `stddev`, `stddev_samp`, `variance`, `var_samp`, `skewness`, `kurtosis`, `covar_samp`, `corr` will return `NULL` instead of `Double.NaN` when `DivideByZero` occurs during expression evaluation, for example, when `stddev_samp` applied on a single element set. In Spark version 3.0 and earlier, it will return `Double.NaN` in such case. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.statisticalAggregate` to `true`.

### How was this patch tested?
Updated DataFrameAggregateSuite/DataFrameWindowFunctionsSuite to test both default and legacy behavior.
Adjust DataFrameWindowFunctionsSuite/SQLQueryTestSuite and some R case to update to the default return null behavior.

Closes #29983 from leanken/leanken-SPARK-13860.

Authored-by: xuewei.linxuewei <xuewei.linxuewei@alibaba-inc.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-13 13:21:45 +00:00
Huaxin Gao af3e2f7d58 [SPARK-33081][SQL] Support ALTER TABLE in JDBC v2 Table Catalog: update type and nullability of columns (DB2 dialect)
### What changes were proposed in this pull request?
- Override the default SQL strings in the DB2 Dialect for:

  * ALTER TABLE UPDATE COLUMN TYPE
  * ALTER TABLE UPDATE COLUMN NULLABILITY

- Add new docker integration test suite jdbc/v2/DB2IntegrationSuite.scala

### Why are the changes needed?
In SPARK-24907, we implemented JDBC v2 Table Catalog but it doesn't support some ALTER TABLE at the moment. This PR supports DB2 specific ALTER TABLE.

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
By running new integration test suite:

$ ./build/sbt -Pdocker-integration-tests "test-only *.DB2IntegrationSuite"

Closes #29972 from huaxingao/db2_docker.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-13 12:57:54 +00:00
Chao Sun feee8da14b [SPARK-32858][SQL] UnwrapCastInBinaryComparison: support other numeric types
### What changes were proposed in this pull request?

In SPARK-24994 we implemented unwrapping cast for **integral types**. This extends it to support **numeric types** such as float/double/decimal, so that filters involving these types can be better pushed down to data sources.

Unlike the cases of integral types, conversions between numeric types can result to rounding up or downs. Consider the following case:

```sql
cast(e as double) < 1.9
```

assume type of `e` is short, since 1.9 is not representable in the type, the casting will either truncate or round. Now suppose the literal is truncated, we cannot convert the expression to:

```sql
e < cast(1.9 as short)
```

as in the previous implementation, since if `e` is 1, the original expression evaluates to true, but converted expression will evaluate to false.

To resolve the above, this PR first finds out whether casting from the wider type to the narrower type will result to truncate or round, by comparing a _roundtrip value_ derived from **converting the literal first to the narrower type, and then to the wider type**, versus the original literal value. For instance, in the above, we'll first obtain a roundtrip value via the conversion (double) 1.9 -> (short) 1 -> (double) 1.0, and then compare it against 1.9.

<img width="1153" alt="Screen Shot 2020-09-28 at 3 30 27 PM" src="https://user-images.githubusercontent.com/506679/94492719-bd29e780-019f-11eb-9111-71d6e3d157f7.png">

Now in the case of truncate, we'd convert the original expression to:
```sql
e <= cast(1.9 as short)
```
instead, so that the conversion also is valid when `e` is 1.

For more details, please check [this blog post](https://prestosql.io/blog/2019/05/21/optimizing-the-casts-away.html) by Presto which offers a very good explanation on how it works.

### Why are the changes needed?

For queries such as:
```sql
SELECT * FROM tbl WHERE short_col < 100.5
```
The predicate `short_col < 100.5` can't be pushed down to data sources because it involves casts. This eliminates the cast so these queries can run more efficiently.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests

Closes #29792 from sunchao/SPARK-32858.

Lead-authored-by: Chao Sun <sunchao@apple.com>
Co-authored-by: Chao Sun <sunchao@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-13 12:44:20 +00:00
Yuming Wang e34f2d8df2 [SPARK-33119][SQL] ScalarSubquery should returns the first two rows to avoid Driver OOM
### What changes were proposed in this pull request?

`ScalarSubquery` should returns the first two rows.

### Why are the changes needed?

To avoid Driver OOM.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing test: d6f3138352/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala (L147-L154)

Closes #30016 from wangyum/SPARK-33119.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-13 17:41:55 +09:00
Pablo 819f12ee2f [SPARK-33118][SQL] CREATE TEMPORARY TABLE fails with location
### What changes were proposed in this pull request?

We have a problem when you use CREATE TEMPORARY TABLE with LOCATION

```scala
spark.range(3).write.parquet("/tmp/testspark1")

sql("CREATE TEMPORARY TABLE t USING parquet OPTIONS (path '/tmp/testspark1')")
sql("CREATE TEMPORARY TABLE t USING parquet LOCATION '/tmp/testspark1'")
```
```scala
org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
  at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$12(DataSource.scala:200)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:200)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:408)
  at org.apache.spark.sql.execution.datasources.CreateTempViewUsing.run(ddl.scala:94)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
```
This bug was introduced by SPARK-30507.
sparksqlparser --> visitCreateTable --> visitCreateTableClauses --> cleanTableOptions extract the path from the options but in this case CreateTempViewUsing need the path in the options map.

### Why are the changes needed?

To fix the problem

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit testing and manual testing

Closes #30014 from planga82/bugfix/SPARK-33118_create_temp_table_location.

Authored-by: Pablo <pablo.langa@stratio.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-10-12 14:18:34 -07:00
xuewei.linxuewei b27a287ff2 [SPARK-33016][SQL] Potential SQLMetrics missed which might cause WEB UI display issue while AQE is on
### What changes were proposed in this pull request?

With following scenario when AQE is on, SQLMetrics could be incorrect.

1. Stage A and B are created, and UI updated thru event onAdaptiveExecutionUpdate.
2. Stage A and B are running. Subquery in stage A keep updating metrics thru event onAdaptiveSQLMetricUpdate.
3. Stage B completes, while stage A's subquery is still running, updating metrics.
4. Completion of stage B triggers new stage creation and UI update thru event onAdaptiveExecutionUpdate again (just like step 1).

So decided to make a trade off of keeping more duplicate SQLMetrics without deleting them when AQE with newPlan updated.

### Why are the changes needed?

Make SQLMetrics behavior 100% correct.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Updated SQLAppStatusListenerSuite.

Closes #29965 from leanken/leanken-SPARK-33016.

Authored-by: xuewei.linxuewei <xuewei.linxuewei@alibaba-inc.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-12 14:48:40 +00:00