Commit graph

11652 commits

Author SHA1 Message Date
Kousuke Saruta e17612d0bf Revert "[SPARK-36429][SQL] JacksonParser should throw exception when data type unsupported"
### What changes were proposed in this pull request?

This PR reverts the change in SPARK-36429 (#33654).
See [conversation](https://github.com/apache/spark/pull/33654#issuecomment-894160037).

### Why are the changes needed?

To recover CIs.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

N/A

Closes #33670 from sarutak/revert-SPARK-36429.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2021-08-06 20:56:24 +09:00
gaoyajun02 888f8f03c8 [SPARK-36339][SQL] References to grouping that not part of aggregation should be replaced
### What changes were proposed in this pull request?

Currently, references to grouping sets are reported as errors after aggregated expressions, e.g.
```
SELECT count(name) c, name
FROM VALUES ('Alice'), ('Bob') people(name)
GROUP BY name GROUPING SETS(name);
```
Error in query: expression 'people.`name`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;

### Why are the changes needed?

Fix the map anonymous function in the constructAggregateExprs function does not use underscores to avoid

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit tests.

Closes #33574 from gaoyajun02/SPARK-36339.

Lead-authored-by: gaoyajun02 <gaoyajun02@gmail.com>
Co-authored-by: gaoyajun02 <gaoyajun02@meituan.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-06 16:34:37 +08:00
ulysses-you c97fb68885 [SPARK-35221][SQL] Add the check of supported join hints
### What changes were proposed in this pull request?

Print warning msg if join hint is not supported for the specified build side.

### Why are the changes needed?

Currently we support specify the join implementation with hint, but Spark did not promise it.

For example broadcast outer join and hash outer join we need to check if its build side was supported. And at least we should print some warning log instead of changing to other join implementation silently.

### Does this PR introduce _any_ user-facing change?

Yes, warning log might be printed.

### How was this patch tested?

Add new test.

Closes #32355 from ulysses-you/SPARK-35221.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-06 15:29:43 +08:00
Kousuke Saruta 63c7d1847d [SPARK-36429][SQL][FOLLOWUP] Update a golden file to comply with the change in SPARK-36429
### What changes were proposed in this pull request?

This PR updates a golden to comply with the change in SPARK-36429 (#33654).

### Why are the changes needed?

To recover GA failure.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

GA itself.

Closes #33663 from sarutak/followup-SPARK-36429.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-06 15:20:54 +08:00
gengjiaan eb12727bc7 [SPARK-36429][SQL] JacksonParser should throw exception when data type unsupported
### What changes were proposed in this pull request?
Currently, when `set spark.sql.timestampType=TIMESTAMP_NTZ`, the behavior is different between `from_json` and `from_csv`.
```
-- !query
select from_json('{"t":"26/October/2015"}', 't Timestamp', map('timestampFormat', 'dd/MMMMM/yyyy'))
-- !query schema
struct<from_json({"t":"26/October/2015"}):struct<t:timestamp_ntz>>
-- !query output
{"t":null}
```

```
-- !query
select from_csv('26/October/2015', 't Timestamp', map('timestampFormat', 'dd/MMMMM/yyyy'))
-- !query schema
struct<>
-- !query output
java.lang.Exception
Unsupported type: timestamp_ntz
```

We should make `from_json` throws exception too.
This PR fix the discussion below
https://github.com/apache/spark/pull/33640#discussion_r682862523

### Why are the changes needed?
Make the behavior of `from_json` more reasonable.

### Does this PR introduce _any_ user-facing change?
'Yes'.
from_json throwing Exception when we set spark.sql.timestampType=TIMESTAMP_NTZ.

### How was this patch tested?
Tests updated.

Closes #33654 from beliefer/SPARK-36429.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-06 12:53:04 +08:00
Kent Yao c7fa3c9090 [SPARK-36421][SQL][DOCS] Use ConfigEntry.key to fix docs and set command results
### What changes were proposed in this pull request?

This PR fixes the issue that `ConfigEntry` to be introduced to the doc field directly without calling `.key`, which causes malformed documents on the web site and in the result of `SET -v`

1. https://spark.apache.org/docs/3.1.2/configuration.html#static-sql-configuration - spark.sql.hive.metastore.jars

2. set -v
![image](https://user-images.githubusercontent.com/8326978/128292412-85100f95-24fd-4b40-a14f-d31a256dab7d.png)

### Why are the changes needed?

bugfix

### Does this PR introduce _any_ user-facing change?

no, but contains doc fix
### How was this patch tested?

new tests

Closes #33647 from yaooqinn/SPARK-36421.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-06 11:01:47 +09:00
Kent Yao 0c94e47aec [SPARK-36414][SQL] Disable timeout for BroadcastQueryStageExec in AQE
### What changes were proposed in this pull request?

This reverts SPARK-31475, as there are always more concurrent jobs running in AQE mode, especially when running multiple queries at the same time. Currently, the broadcast timeout does not record accurately for the BroadcastQueryStageExec only, but also including the time waiting for being scheduled. If all the resources are currently being occupied for materializing other stages, it timeouts without a chance to run actually.

 

![image](https://user-images.githubusercontent.com/8326978/128169612-4c96c8f6-6f8e-48ed-8eaf-450f87982c3b.png)

 

The default value is 300s, and it's hard to adjust the timeout for AQE mode. Usually, you need an extremely large number for real-world cases. As you can see in the example, above, the timeout we used for it was 1800s, and obviously, it needed 3x more or something

 

### Why are the changes needed?

AQE is default now, we can make it more stable with this PR

### Does this PR introduce _any_ user-facing change?

yes, broadcast timeout now is not used for AQE

### How was this patch tested?

modified test

Closes #33636 from yaooqinn/SPARK-36414.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-05 21:15:35 +08:00
Wenchen Fan 095f9ff75b [SPARK-36409][SQL][TESTS] Splitting test cases from datetime.sql
### What changes were proposed in this pull request?

Currently `datetime.sql` contains a lot of tests and will be run 3 times: default mode, ansi mode, ntz mode. It wastes the test time and also large test files are hard to read.

This PR proposes to split it into smaller ones:
1. `date.sql`, which contains date literals, functions and operations. It will be run twice with default and ansi mode.
2. `timestamp.sql`, which contains timestamp (no ltz or ntz suffix) literals, functions and operations. It will be run 4 times: default mode + ans off, defaul mode + ansi on, ntz mode + ansi off, ntz mode + ansi on.
3. `datetime_special.sql`, which create datetime values whose year is outside of [0, 9999]. This is a separated file as JDBC doesn't support them and need to ignore this test file. It will be run 4 times as well.
4. `timestamp_ltz.sql`, which contains timestamp_ltz literals and constructors. It will be run twice with default and ntz mode, to make sure its result doesn't change with the timestamp mode. Note that, operations with ltz are tested by `timestamp.sql`
5. `timestamp_ntz.sql`, which contains timestamp_ntz literals and constructors. It will be run twice with default and ntz mode, to make sure its result doesn't change with the timestamp mode. Note that, operations with ntz are tested by `timestamp.sql`

### Why are the changes needed?

reduce test run time.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

Closes #33640 from cloud-fan/test.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-08-05 20:55:32 +08:00
Angerszhuuuu 02810eecbf [SPARK-36353][SQL] RemoveNoopOperators should keep output schema
### What changes were proposed in this pull request?
 RemoveNoopOperators should keep output schema

### Why are the changes needed?
Expand function

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Not need

Closes #33587 from AngersZhuuuu/SPARK-36355.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-05 20:43:35 +08:00
yangjie01 01cf6f4c6b [SPARK-34309][BUILD][CORE][SQL][K8S] Use Caffeine instead of Guava Cache
### What changes were proposed in this pull request?
There are 3 ways to use Guava cache in spark code:

1. `Loadingcache` is the main way to use Guava cache in spark code and the key usages are as follows:
  a. `LoadingCache` with `maximumsize` data eviction policy, such as `appCache` in `ApplicationCache`, `cache` in `Codegenerator`
  b. `LoadingCache` with `maximumWeight` data eviction policy, such as `shuffleIndexCache` in `ExternalShuffleBlockResolver`
  c. `LoadingCache` with 'expireAfterWrite' data eviction policy, such as `tableRelationCache` in `SessionCatalog`
2. `ManualCache` is another way to use Guava cache in spark code and the key usage is `cache` in `SharedInMemoryCache`, it use to caches partition file statuses in memory

3. The last use way is `hadoopJobMetadata` in `SparkEnv`, it uses Guava Cache to build a `soft-reference map`.

The goal of this pr is use `Caffeine` instead of `Guava Cache` because `Caffeine` is faster than `Guava Cache` from benchmarks, the main changes as follows:

1. Add `Caffeine` deps to maven `pom.xml`

2. Use `Caffeine` instead of Guava `LoadingCache`, `ManualCache` and soft-reference map in `SparkEnv`

3. Add `LocalCacheBenchmark` to compare performance of `Loadingcache` between `Guava Cache` and `Caffeine`

### Why are the changes needed?
`Caffeine` is faster than `Guava Cache` from benchmarks

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass the Jenkins or GitHub Action
- Add `LocalCacheBenchmark` to compare performance of `Loadingcache` between `Guava Cache` and `Caffeine`

Closes #31517 from LuciferYang/guava-cache-to-caffeine.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Holden Karau <hkarau@netflix.com>
2021-08-04 12:01:44 -07:00
Cheng Su de62b5ae32 [SPARK-36404][SQL] Support ORC nested column vectorized reader for data source v2
### What changes were proposed in this pull request?

We added support of nested columns in ORC vectorized reader for data source v1. Data source v2 and v1 both use same underlying implementation for vectorized reader (OrcColumnVector), so we can support data source v2 as well.

### Why are the changes needed?

Improve query performance for ORC data source v2 when reading nested columns.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added test in `OrcQuerySuite.scala`.

Closes #33626 from c21/orc-v2.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-08-03 19:55:14 -07:00
PengLei 87d49cbcb1 [SPARK-36381][SQL] Add case sensitive and case insensitive compare for checking column name exist when alter table
### What changes were proposed in this pull request?
Add the Resolver to `checkColumnNotExists` to check name exist in case sensitive.

### Why are the changes needed?
At now the resolver is `_ == _` of `findNestedField`  called by `checkColumnNotExists`
Add `alter.conf.resolver` to it.
[SPARK-36381](https://issues.apache.org/jira/browse/SPARK-36381)
### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add ut tests

Closes #33618 from Peng-Lei/sensitive-cloumn-name.

Authored-by: PengLei <peng.8lei@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-04 10:04:13 +09:00
Yuming Wang 4a6afb4875 [SPARK-36280][SQL] Remove redundant aliases after RewritePredicateSubquery
### What changes were proposed in this pull request?

Remove redundant aliases after `RewritePredicateSubquery`. For example:
```scala
sql("CREATE TABLE t1 USING parquet AS SELECT id AS a, id AS b, id AS c FROM range(10)")
sql("CREATE TABLE t2 USING parquet AS SELECT id AS x, id AS y FROM range(8)")
sql(
  """
    |SELECT *
    |FROM  t1
    |WHERE  a IN (SELECT x
    |  FROM  (SELECT x AS x,
    |           Rank() OVER (partition BY x ORDER BY Sum(y) DESC) AS ranking
    |    FROM   t2
    |    GROUP  BY x) tmp1
    |  WHERE  ranking <= 5)
    |""".stripMargin).explain
```
Before this PR:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [a#10L], [x#7L], LeftSemi, BuildRight, false
   :- FileScan parquet default.t1[a#10L,b#11L,c#12L]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=#68]
      +- Project [x#7L]
         +- Filter (ranking#8 <= 5)
            +- Window [rank(_w2#25L) windowspecdefinition(x#15L, _w2#25L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ranking#8], [x#15L], [_w2#25L DESC NULLS LAST]
               +- Sort [x#15L ASC NULLS FIRST, _w2#25L DESC NULLS LAST], false, 0
                  +- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=#62]
                     +- HashAggregate(keys=[x#15L], functions=[sum(y#16L)])
                        +- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=#59]
                           +- HashAggregate(keys=[x#15L], functions=[partial_sum(y#16L)])
                              +- FileScan parquet default.t2[x#15L,y#16L]
```

After this PR:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [a#10L], [x#15L], LeftSemi, BuildRight, false
   :- FileScan parquet default.t1[a#10L,b#11L,c#12L]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=#67]
      +- Project [x#15L]
         +- Filter (ranking#8 <= 5)
            +- Window [rank(_w2#25L) windowspecdefinition(x#15L, _w2#25L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ranking#8], [x#15L], [_w2#25L DESC NULLS LAST]
               +- Sort [x#15L ASC NULLS FIRST, _w2#25L DESC NULLS LAST], false, 0
                  +- HashAggregate(keys=[x#15L], functions=[sum(y#16L)])
                     +- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=#59]
                        +- HashAggregate(keys=[x#15L], functions=[partial_sum(y#16L)])
                           +- FileScan parquet default.t2[x#15L,y#16L]
```

### Why are the changes needed?

Reduce shuffle to improve query performance. This change can benefit TPC-DS q70.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #33509 from wangyum/SPARK-36280.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-08-03 13:56:59 -07:00
Max Gekk 67cbc93263 [SPARK-36349][SQL] Disallow ANSI intervals in file-based datasources
### What changes were proposed in this pull request?
In the PR, I propose to ban `YearMonthIntervalType` and `DayTimeIntervalType` at the analysis phase while creating a table using a built-in filed-based datasource or writing a dataset to such datasource. In particular, add the following case:
```scala
case _: DayTimeIntervalType | _: YearMonthIntervalType => false
```
to all methods that override either:
- V2 `FileTable.supportsDataType()`
- V1 `FileFormat.supportDataType()`

### Why are the changes needed?
To improve user experience with Spark SQL, and output a proper error message at the analysis phase.

### Does this PR introduce _any_ user-facing change?
Yes but ANSI interval types haven't released yet. So, for users this is new behavior.

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt -Phive-2.3 "test:testOnly *HiveOrcSourceSuite"
```

Closes #33580 from MaxGekk/interval-ban-in-ds.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-08-03 20:30:20 +03:00
Kousuke Saruta 92cdb17d1a [SPARK-35815][SQL][FOLLOWUP] Add test considering the case spark.sql.legacy.interval.enabled is true
### What changes were proposed in this pull request?

This PR adds test considering the case `spark.sql.legacy.interval.enabled` is `true` for SPARK-35815.

### Why are the changes needed?

SPARK-35815 (#33456) changes `Dataset.withWatermark` to accept ANSI interval literals as `delayThreshold` but I noticed the change didn't work with `spark.sql.legacy.interval.enabled=true`.
We can't detect this issue because there is no test which considers the legacy interval type at that time.
In SPARK-36323 (#33551), this issue was resolved but it's better to add test.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #33606 from sarutak/test-watermark-with-legacy-interval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-08-03 13:48:41 +03:00
Wenchen Fan dd80457ffb [SPARK-36315][SQL] Only skip AQEShuffleReadRule in the final stage if it breaks the distribution requirement
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/30494

This PR proposes a new way to optimize the final query stage in AQE. We first collect the effective user-specified repartition (semantic-wise, user-specified repartition is only effective if it's the root node or under a few simple nodes), and get the required distribution for the final plan. When we optimize the final query stage, we skip certain `AQEShuffleReadRule` if it breaks the required distribution.

### Why are the changes needed?

The current solution for optimizing the final query stage is pretty hacky and overkill. As an example, the newly added rule `OptimizeSkewInRebalancePartitions` can hardly apply as it's very common that the query plan has shuffles with origin `ENSURE_REQUIREMENTS`, which is not supported by `OptimizeSkewInRebalancePartitions`.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

updated tests

Closes #33541 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-03 18:28:52 +08:00
Wenchen Fan 7cb9c1c241 [SPARK-36380][SQL] Simplify the logical plan names for ALTER TABLE ... COLUMN
### What changes were proposed in this pull request?

This a followup of the recent work such as https://github.com/apache/spark/pull/33200

For `ALTER TABLE` commands, the logical plans do not have the common `AlterTable` prefix in the name and just use names like `SetTableLocation`. This PR proposes to follow the same naming rule in `ALTER TABE ... COLUMN` commands.

This PR also moves these AlterTable commands to a individual file and give them a base trait.

### Why are the changes needed?

name simplification

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing test

Closes #33609 from cloud-fan/dsv2.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-08-03 10:43:00 +03:00
Yuming Wang c20af53580 [SPARK-36373][SQL] DecimalPrecision only add necessary cast
### What changes were proposed in this pull request?

This pr makes `DecimalPrecision` only add necessary cast similar to [`ImplicitTypeCasts`](96c2919988/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala (L675-L678)). For example:
```
EqualTo(AttributeReference("d1", DecimalType(5, 2))(), AttributeReference("d2", DecimalType(2, 1))())
```
It will add a useless cast to _d1_:
```
(cast(d1#6 as decimal(5,2)) = cast(d2#7 as decimal(5,2)))
```

### Why are the changes needed?

1. Avoid adding unnecessary cast. Although it will be removed by  `SimplifyCasts` later.
2. I'm trying to add an extended rule similar to `PullOutGroupingExpressions`. The current behavior will introduce additional alias. For example: `cast(d1 as decimal(5,2)) as cast_d1`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #33602 from wangyum/SPARK-36373.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-08-03 08:12:54 +08:00
Chao Sun 7a27f8a07f [SPARK-36137][SQL] HiveShim should fallback to getAllPartitionsOf even if directSQL is enabled in remote HMS
### What changes were proposed in this pull request?

Change `HiveShim.getPartitionsByFilter` to always fallback to use `getAllPartitionsMethod` even if `hive.metastore.try.direct.sql` is set to true in the remote HMS.

### Why are the changes needed?

At the moment `getPartitionsByFilter` in `HiveShim` only fallback to use `getAllPartitionsMethod` when `hive.metastore.try.direct.sql` is disabled in the remote HMS, and will fail the query otherwise. However, in certain cases the remote HMS will fallback to use ORM (which only support string type for partition columns) to query the underlying RDBMS **even if this config is set to true**. In this scenario, currently Spark will not be able to recover from the exception and will just fail the query.

For instance, we encountered this bug [HIVE-21497](https://issues.apache.org/jira/browse/HIVE-21497) in HMS running Hive 3.1.2, and Spark was not able to pushdown filter for date column.

### Does this PR introduce _any_ user-facing change?

Yes, now if Spark is querying partitions from a remote HMS which throws exception even if `hive.metastore.try.direct.sql` is set to true, Spark will fallback to list all partitions and do the pruning on client side, instead of failing the query.

### How was this patch tested?

Tested locally with a HMS instance running 3.1.2. It's pretty hard to add a unit test for this since we don't have a mock HMS.

Closes #33382 from sunchao/SPARK-36137-direct-sql.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-08-02 16:48:43 -07:00
Hyukjin Kwon 0bbcbc6508 [SPARK-36379][SQL] Null at root level of a JSON array should not fail w/ permissive mode
### What changes were proposed in this pull request?

This PR proposes to fail properly so JSON parser can proceed and parse the input with the permissive mode.
Previously, we passed `null`s as are, the root `InternalRow`s became `null`s, and it causes the query fails even with permissive mode on.
Now, we fail explicitly if `null` is passed when the input array contains `null`.

Note that this is consistent with non-array JSON input:

**Permissive mode:**

```scala
spark.read.json(Seq("""{"a": "str"}""", """null""").toDS).collect()
```
```
res0: Array[org.apache.spark.sql.Row] = Array([str], [null])
```

**Failfast mode**:

```scala
spark.read.option("mode", "failfast").json(Seq("""{"a": "str"}""", """null""").toDS).collect()
```
```
org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:70)
	at org.apache.spark.sql.DataFrameReader.$anonfun$json$7(DataFrameReader.scala:540)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
```

### Why are the changes needed?

To make the permissive mode to proceed and parse without throwing an exception.

### Does this PR introduce _any_ user-facing change?

**Permissive mode:**

```scala
spark.read.json(Seq("""[{"a": "str"}, null]""").toDS).collect()
```

Before:

```
java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
```

After:

```
res0: Array[org.apache.spark.sql.Row] = Array([null])
```

NOTE that this behaviour is consistent when JSON object is malformed:

```scala
spark.read.schema("a int").json(Seq("""[{"a": 123}, {123123}, {"a": 123}]""").toDS).collect()
```

```
res0: Array[org.apache.spark.sql.Row] = Array([null])
```

Since we're parsing _one_ JSON array, related records all fail together.

**Failfast mode:**

```scala
spark.read.option("mode", "failfast").json(Seq("""[{"a": "str"}, null]""").toDS).collect()
```

Before:

```
java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
```

After:

```
org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:70)
	at org.apache.spark.sql.DataFrameReader.$anonfun$json$7(DataFrameReader.scala:540)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
```

### How was this patch tested?

Manually tested, and unit test was added.

Closes #33608 from HyukjinKwon/SPARK-36379.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-08-02 10:01:12 -07:00
Angerszhuuuu f3173956cb [SPARK-36086][SQL] CollapseProject project replace alias should use origin column name
### What changes were proposed in this pull request?
For added UT, without this patch will failed as below
```
[info] - SHOW TABLES V2: SPARK-36086: CollapseProject project replace alias should use origin column name *** FAILED *** (4 seconds, 935 milliseconds)
[info]   java.lang.RuntimeException: After applying rule org.apache.spark.sql.catalyst.optimizer.CollapseProject in batch Operator Optimization before Inferring Filters, the structural integrity of the plan is broken.
[info]   at org.apache.spark.sql.errors.QueryExecutionErrors$.structuralIntegrityIsBrokenAfterApplyingRuleError(QueryExecutionErrors.scala:1217)
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:229)
[info]   at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
[info]   at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
[info]   at scala.collection.immutable.List.foldLeft(List.scala:91)
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
[info]   at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
```

CollapseProject project replace alias should use origin column name
### Why are the changes needed?
Fix bug

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #33576 from AngersZhuuuu/SPARK-36086.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-03 00:08:13 +08:00
Linhong Liu 2f700773c2 [SPARK-36224][SQL] Use Void as the type name of NullType
### What changes were proposed in this pull request?
Change the `NullType.simpleString` to "void" to set "void" as the formal type name of `NullType`

### Why are the changes needed?
This PR is intended to address the type name discussion in PR #28833. Here are the reasons:
1. The type name of NullType is displayed everywhere, e.g. schema string, error message, document. Hence it's not possible to hide it from users, we have to choose a proper name
2. The "void" is widely used as the type name of "NULL", e.g. Hive, pgSQL
3. Changing to "void" can enable the round trip of `toDDL`/`fromDDL` for NullType. (i.e. make `from_json(col, schema.toDDL)`) work

### Does this PR introduce _any_ user-facing change?
Yes, the type name of "NULL" is changed from "null" to "void". for example:
```
scala> sql("select null as a, 1 as b").schema.catalogString
res5: String = struct<a:void,b:int>
```

### How was this patch tested?
existing test cases

Closes #33437 from linhongliu-db/SPARK-36224-void-type-name.

Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-02 23:19:54 +08:00
Terry Kim 3b713e7f61 [SPARK-36372][SQL] v2 ALTER TABLE ADD COLUMNS should check duplicates for the user specified columns
### What changes were proposed in this pull request?

Currently, v2 ALTER TABLE ADD COLUMNS does not check duplicates for the user specified columns. For example,
```
spark.sql(s"CREATE TABLE $t (id int) USING $v2Format")
spark.sql("ALTER TABLE $t ADD COLUMNS (data string, data string)")
```
doesn't fail the analysis, and it's up to the catalog implementation to handle it. For v1 command, the duplication is checked before invoking the catalog.

### Why are the changes needed?

To check the duplicate columns during analysis and be consistent with v1 command.

### Does this PR introduce _any_ user-facing change?

Yes, now the above will command will print out the fllowing:
```
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the user specified columns: `data`
```

### How was this patch tested?

Added new unit tests

Closes #33600 from imback82/alter_add_duplicate_columns.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-02 17:54:50 +08:00
Dongjoon Hyun 22c49226f7 [SPARK-36362][CORE][SQL][FOLLOWUP] Fix java linter errors
### What changes were proposed in this pull request?

This is a follow-up of #33594 to fix the Java linter error.

### Why are the changes needed?

To recover GitHub Action.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the GitHub Action.

Closes #33601 from dongjoon-hyun/SPARK-36362.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-01 21:40:03 +09:00
Sean Owen 72615bc551 [SPARK-36362][CORE][SQL][TESTS] Omnibus Java code static analyzer warning fixes
### What changes were proposed in this pull request?

Fix up some minor Java issues:

- Some int*int multiplications that widen to long maybe could overflow
- Unnecessarily non-static inner classes
- Some tests "catch (AssertionError)" and do nothing
- Manual array iteration vs very slightly faster/simpler foreach
- Incorrect generic types that just happen to not cause a runtime error
- Missed opportunities for try-close
- Mutable enums
- .. and a few other minor things

### Why are the changes needed?

Some are minor but clear fixes; some may have a marginal perf impact or avoid a bug later. Also: maybe avoid future PRs to address these one by one.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests

Closes #33594 from srowen/SPARK-36362.

Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-07-31 22:35:57 -07:00
Enrico Minack a65eb36bae [SPARK-36319][SQL][PYTHON] Make Observation return Map instead of Row
### What changes were proposed in this pull request?
The Observation API (Scala, Java, PySpark) now returns a `Map` / `Dict`. Before, it returned `Row` simply because the metrics are (internal to Observation) retrieved from the listener as rows. Since that is hidden from the user by the Observation API, there is no need to return `Row`.

While touching this code, this moves the unit tests from `DataFrameSuite,scala` to `DatasetSuite.scala` and from `JavaDataFrameSuite.java` to `JavaDatasetSuite.java`, which is a better place.

### Why are the changes needed?
This simplifies the API and accessing the metrics, especially in Java. There is no need for the concept `Row` when retrieving the observation result.

### Does this PR introduce _any_ user-facing change?
Yes, it changes the return type of `get` from `Row` to `Map` (Scala) / `Dict` (Python) and introduces `getAsJavaMap` (Java).

### How was this patch tested?
This is tested in `DatasetSuite.SPARK-34806: observation on datasets`, `JavaDatasetSuite.testObservation` and `test_dataframe.test_observe`.

Closes #33545 from EnricoMi/branch-observation-returns-map.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-01 10:40:28 +09:00
Andy Grove 0f538402fb [SPARK-35881][SQL] Add support for columnar execution of final query stage in AdaptiveSparkPlanExec
### What changes were proposed in this pull request?

Changes in this PR:

- `AdaptiveSparkPlanExec` has new methods `finalPlanSupportsColumnar` and `doExecuteColumnar` to support adaptive queries where the final query stage produces columnar data.
- `SessionState` now has a new set of injectable rules named `finalQueryStagePrepRules` that can be applied to the final query stage.
- `AdaptiveSparkPlanExec` can now safely be wrapped by either `RowToColumnarExec` or `ColumnarToRowExec`.

A Spark plugin can use the new rules to remove the root `ColumnarToRowExec` transition that is inserted by previous rules and at execution time can call `finalPlanSupportsColumnar` to see if the final query stage is columnar. If the plan is columnar then the plugin can safely call `doExecuteColumnar`. The adaptive plan can be wrapped in either `RowToColumnarExec` or `ColumnarToRowExec` to force a particular output format. There are fast paths in both of these operators to avoid any redundant transitions.

### Why are the changes needed?

Without this change it is necessary to use reflection to get the final physical plan to determine whether it is columnar and to execute it is a columnar plan. `AdaptiveSparkPlanExec` only provides public methods for row-based execution.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I have manually tested this patch with the RAPIDS Accelerator for Apache Spark.

Closes #33140 from andygrove/support-columnar-adaptive.

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
2021-07-30 13:21:50 -05:00
Hyukjin Kwon c6140d4d0a [SPARK-36338][PYTHON][SQL] Move distributed-sequence implementation to Scala side
### What changes were proposed in this pull request?

This PR proposes to implement `distributed-sequence` index in Scala side.

### Why are the changes needed?

- Avoid unnecessary (de)serialization
- Keep the nullability in the input DataFrame when `distributed-sequence` is enabled. During the serialization, all fields are being nullable for now (see https://github.com/apache/spark/pull/32775#discussion_r645882104)

### Does this PR introduce _any_ user-facing change?

No to end users since pandas API on Spark is not released yet.

```python
import pyspark.pandas as ps
ps.set_option('compute.default_index_type', 'distributed-sequence')
ps.range(1).spark.print_schema()
```

Before:

```
root
 |-- id: long (nullable = true)
```

After:

```
root
 |-- id: long (nullable = false)
```

### How was this patch tested?

Manually tested, and existing tests should cover them.

Closes #33570 from HyukjinKwon/SPARK-36338.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-30 22:29:23 +09:00
Wenchen Fan 387a251a68 [SPARK-34952][SQL][FOLLOWUP] Simplify JDBC aggregate pushdown
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/33352 , to simplify the JDBC aggregate pushdown:
1. We should get the schema of the aggregate query by asking the JDBC server, instead of calculating it by ourselves. This can simplify the code a lot, and is also more robust: the data type of SUM may vary in different databases, it's fragile to assume they are always the same as Spark.
2. because of 1, now we can remove the `dataType` property from the public `Sum` expression.

This PR also contains some small improvements:
1. Spark should deduplicate the aggregate expressions before pushing them down.
2. Improve the `toString` of public aggregate expressions to make them more SQL.

### Why are the changes needed?

code and API simplification

### Does this PR introduce _any_ user-facing change?

this API is not released yet.

### How was this patch tested?

existing tests

Closes #33579 from cloud-fan/dsv2.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-30 00:26:32 -07:00
Chao Sun 0ece865ea4 [SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package
### What changes were proposed in this pull request?

Move both `PruneFileSourcePartitionsSuite` and `PrunePartitionSuiteBase` to the package `org.apache.spark.sql.execution.datasources`. Did a few refactoring to enable this.

### Why are the changes needed?

Currently both `PruneFileSourcePartitionsSuite` and `PrunePartitionSuiteBase` are in package `org.apache.spark.sql.hive.execution` which doesn't look correct as these tests are not specific to Hive. Therefore, it's better to move them into `org.apache.spark.sql.execution.datasources`, the same place where the rule `PruneFileSourcePartitions` is at.

### Does this PR introduce _any_ user-facing change?

No, it's just test refactoring.

### How was this patch tested?

Using existing tests:
```
build/sbt "sql/testOnly *PruneFileSourcePartitionsSuite"
```
and
```
build/sbt "hive/testOnly *PruneHiveTablePartitionsSuite"
```

Closes #33564 from sunchao/SPARK-36136-partitions-suite.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-29 17:18:23 -07:00
Yuanjian Li 4cd5fa96d8 [SPARK-36347][SS] Upgrade the RocksDB version to 6.20.3
### What changes were proposed in this pull request?
As the discussion in https://github.com/apache/spark/pull/32928/files#r654049392, after confirming the compatibility, we can use a newer RocksDB version for the state store implementation.

### Why are the changes needed?
For further ARM support and leverage the bug fix for the newer version.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes #33578 from xuanyuanking/SPARK-36347.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-07-29 11:08:58 -07:00
Kousuke Saruta db18866742 [SPARK-36323][SQL] Support ANSI interval literals for TimeWindow
### What changes were proposed in this pull request?

This PR proposes to support ANSI interval literals for `TimeWindow`.

### Why are the changes needed?

Watermark also supports ANSI interval literals so it's great to support for `TimeWindow`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #33551 from sarutak/window-interval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-07-29 08:51:51 +03:00
Cheng Su 6a8dd3229a [SPARK-36272][SQL][TEST] Change shuffled hash join metrics test to check relative value of build size
### What changes were proposed in this pull request?

This is a follow up of https://github.com/apache/spark/pull/33447, where the unit test is disabled, due to failure after memory setting changed. I found the root cause is after https://github.com/apache/spark/pull/33447, in unit test, Spark memory page byte size is changed from `67108864` to `33554432` [1]. So the shuffled hash join build size is also changed accordingly due to [memory page byte size change](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L457). Previously the unit test is checking the exact value of build size, so it no longer works. Here we change the unit test to verify the relative value of build size, and it should work.

[1]: I printed out the memory page byte size explicitly in unit test - `org.apache.spark.SparkException: chengsu pageSizeBytes: 33554432!` in https://github.com/c21/spark/runs/3186680616?check_suite_focus=true .

### Why are the changes needed?

Make previously disabled unit test work.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Changed unit test itself.

Closes #33494 from c21/test.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-29 11:14:34 +09:00
Linhong Liu ed0e351f05 [SPARK-36286][SQL] Block some invalid datetime string
### What changes were proposed in this pull request?
In PR #32959, we found some weird datetime strings that can be parsed. ([details](https://github.com/apache/spark/pull/32959#discussion_r665015489))
This PR blocks the invalid datetime string.

### Why are the changes needed?
bug fix

### Does this PR introduce _any_ user-facing change?
Yes, below strings will have different results when cast to datetime.
```sql
select cast('12::' as timestamp); -- Before: 2021-07-07 12:00:00, After: NULL
select cast('T' as timestamp); -- Before: 2021-07-07 00:00:00, After: NULL
```

### How was this patch tested?
some new test cases

Closes #33490 from linhongliu-db/SPARK-35780-block-invalid-format.

Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-29 09:16:46 +08:00
Venki Korukanti eb4d1c0332 [SPARK-36236][SS] Additional metrics for RocksDB based state store implementation
### What changes were proposed in this pull request?

Proposing adding new metrics to `customMetrics` under the `stateOperators` in `StreamingQueryProgress` event These metrics help have better visibility into the RocksDB based state store in streaming jobs. For full details of metrics, refer to https://issues.apache.org/jira/browse/SPARK-36236.

### Why are the changes needed?

Current metrics available for the RockDB state store, do not provide observability into many operations such as how much time is spent by the RocksDB in compaction and what is the cache hit ratio. These metrics help compare performance differences in state store operations between slow and fast microbatches .

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unittests

Closes #33455 from vkorukanti/rocksdb-metrics.

Authored-by: Venki Korukanti <venki.korukanti@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-28 12:51:42 -07:00
dgd-contributor e1c50ff779 [SPARK-36229][SQL] conv() inconsistently handles invalid strings with more than 64 invalid characters and return wrong value on overflow
### What changes were proposed in this pull request?
1/ conv() have inconsistency in behavior where the returned value is different above the 64 char threshold.

```
scala> spark.sql("select conv(repeat('?', 64), 10, 16)").show
+---------------------------+
|conv(repeat(?, 64), 10, 16)|
+---------------------------+
|                          0|
+---------------------------+

scala> spark.sql("select conv(repeat('?', 65), 10, 16)").show // which should be 0
+---------------------------+
|conv(repeat(?, 65), 10, 16)|
+---------------------------+
|           FFFFFFFFFFFFFFFF|
+---------------------------+

scala> spark.sql("select conv(repeat('?', 65), 10, -16)").show // which should be 0
+----------------------------+
|conv(repeat(?, 65), 10, -16)|
+----------------------------+
|                          -1|
+----------------------------+

scala> spark.sql("select conv(repeat('?', 64), 10, -16)").show
+----------------------------+
|conv(repeat(?, 64), 10, -16)|
+----------------------------+
|                           0|
+----------------------------+
```

2/ conv should return result equal to max unsigned long value in base toBase when there is overflow

```
scala> spark.sql(select conv('aaaaaaa0aaaaaaa0a', 16, 10)).show // which should be 18446744073709551615

+-------------------------------+
|conv(aaaaaaa0aaaaaaa0a, 16, 10)|
+-------------------------------+
|           12297828695278266890|
+-------------------------------+
```

### Why are the changes needed?
Bug fix, this pull request aim to make conv function behave similarly with the behavior of conv function from MySQL database
### Does this PR introduce _any_ user-facing change?
change in result of conv() function
### How was this patch tested?
add test

Closes #33459 from dgd-contributor/SPARK-36229_convInconsistencyBehaviorWithMoreThan64Characters.

Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-29 00:19:04 +08:00
Pablo Langa f12793de20 [SPARK-35320][SQL] Improve error message for unsupported key types in MapType in from_json expression
### What changes were proposed in this pull request?

Currently, when a map is parsed in a from_json function, only StringType key is supported. If you try to parse other type, it results on a cast exception.
For example:
```scala
Seq((s"""{"2021-05-05T20:05:08": "sampleValue"}"""))
  .toDF("value")
  .withColumn("value1", from_json(col("value"),  MapType(TimestampType, StringType)))
  .show
```
```
Exception in thread "main" java.lang.ClassCastException: class org.apache.spark.unsafe.types.UTF8String cannot be cast to class java.lang.Long (org.apache.spark.unsafe.types.UTF8String is in unnamed module of loader 'app'; java.lang.Long is in module java.base of loader 'bootstrap')
	at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107)
	at org.apache.spark.sql.catalyst.expressions.CastBase.$anonfun$castToString$8$adapted(Cast.scala:297)
	at org.apache.spark.sql.catalyst.expressions.CastBase.buildCast(Cast.scala:285)
	at org.apache.spark.sql.catalyst.expressions.CastBase.$anonfun$castToString$7(Cast.scala:297)
```
This PR proposes to improve the error message.
```
org.apache.spark.sql.AnalysisException: cannot resolve 'entries' due to data type mismatch: Input schema map<timestamp,string> can only contain StringType as a key type for a MapType.;
'Project [unresolvedalias(from_json(MapType(TimestampType,StringType,true), value#1, Some(America/Los_Angeles)), Some(org.apache.spark.sql.Column$$Lambda$1496/54693608710e5bf9c))]
+- LocalRelation [value#1]
	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:197)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:182)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:535)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
...
```
In https://github.com/apache/spark/pull/32599 we decide to improve the error message instead of support this.

### Why are the changes needed?

Avoid confusion in the interpretation of the error

### Does this PR introduce _any_ user-facing change?

Yes, the error message returned in this case

### How was this patch tested?

Unit testing and manual testing

Closes #33525 from planga82/feature/spark35320_improve_error_message.

Authored-by: Pablo Langa <soypab@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-28 14:46:06 +08:00
Angerszhuuuu f086c17b8e [SPARK-36312][SQL][FOLLOWUP] Add back ParquetSchemaConverter.checkFieldNames
### What changes were proposed in this pull request?
Add back ParquetSchemaConverter.checkFieldNames()

### Why are the changes needed?
Fix code

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

Closes #33552 from AngersZhuuuu/SPARK-36312-FOLLOWUP.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-28 14:38:23 +08:00
Angerszhuuuu 86f44578e5 [SPARK-33865][SPARK-36202][SQL] When HiveDDL, we need check avro schema too
### What changes were proposed in this pull request?
Unify schema check code of FileFormat and check avro schema filed name when CREATE TABLE DDL too

### Why are the changes needed?
Refactor code

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Not need

Closes #33441 from AngersZhuuuu/SPARK-36202.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-28 14:04:24 +08:00
Terry Kim 809b88a162 [SPARK-36006][SQL] Migrate ALTER TABLE ... ADD/REPLACE COLUMNS commands to use UnresolvedTable to resolve the identifier
### What changes were proposed in this pull request?

This PR proposes to migrate the following `ALTER TABLE ... ADD/REPLACE COLUMNS` commands to use `UnresolvedTable` as a `child` to resolve the table identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing).

### Why are the changes needed?

This is a part of effort to make the relation lookup behavior consistent: [SPARK-29900](https://issues.apache.org/jira/browse/SPARK-29900).

### Does this PR introduce _any_ user-facing change?

After this PR, the above `ALTER TABLE ... ADD/REPLACE COLUMNS` commands will have a consistent resolution behavior.

### How was this patch tested?

Updated existing tests.

Closes #33200 from imback82/alter_add_cols.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-28 14:00:29 +08:00
Angerszhuuuu 59e0c25376 [SPARK-36312][SQL] ParquetWriterSupport.setSchema should check inner field
### What changes were proposed in this pull request?
Last pr only support add inner field check for hive ddl, this pr add check for parquet data source write API.

### Why are the changes needed?
Failed earlier

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added Ut

Without this UI it failed as
```
[info] - SPARK-36312: ParquetWriteSupport should check inner field *** FAILED *** (8 seconds, 29 milliseconds)
[info]   Expected exception org.apache.spark.sql.AnalysisException to be thrown, but org.apache.spark.SparkException was thrown (HiveDDLSuite.scala:3035)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1563)
[info]   at org.scalatest.Assertions.intercept(Assertions.scala:756)
[info]   at org.scalatest.Assertions.intercept$(Assertions.scala:746)
[info]   at org.scalatest.funsuite.AnyFunSuite.intercept(AnyFunSuite.scala:1563)
[info]   at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$396(HiveDDLSuite.scala:3035)
[info]   at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$396$adapted(HiveDDLSuite.scala:3034)
[info]   at org.apache.spark.sql.catalyst.plans.SQLHelper.withTempPath(SQLHelper.scala:69)
[info]   at org.apache.spark.sql.catalyst.plans.SQLHelper.withTempPath$(SQLHelper.scala:66)
[info]   at org.apache.spark.sql.QueryTest.withTempPath(QueryTest.scala:34)
[info]   at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$395(HiveDDLSuite.scala:3034)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withView(SQLTestUtils.scala:316)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withView$(SQLTestUtils.scala:314)
[info]   at org.apache.spark.sql.hive.execution.HiveDDLSuite.withView(HiveDDLSuite.scala:396)
[info]   at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$394(HiveDDLSuite.scala:3032)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:190)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:62)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:62)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563)
[info]   at org.scalatest.Suite.run(Suite.scala:1112)
[info]   at org.scalatest.Suite.run$(Suite.scala:1094)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:62)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:62)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:318)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:513)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:748)
[info]   Cause: org.apache.spark.SparkException: Job aborted.
[info]   at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:496)
[info]   at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:251)
[info]   at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
[info]   at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
[info]   at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
[info]   at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
[info]   at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
[info]   at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
[info]   at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
[info]   at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
[info]   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
[info]   at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
[info]   at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
[info]   at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
[info]   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
[info]   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
[info]   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
[info]   at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
[info]   at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
[info]   at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
[info]   at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115)
[info]   at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
[info]   at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
[info]   at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
[info]   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
[info]   at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:781)
[in
```

Closes #33531 from AngersZhuuuu/SPARK-36312.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-28 13:52:27 +08:00
Eugene Koifman 41a16ebf11 [SPARK-35639][SQL] Add metrics about coalesced partitions to AQEShuffleRead in AQE
### What changes were proposed in this pull request?

AQEShuffleReadExec already reports "number of skewed partitions" and "number of skewed partition splits".
It would be useful to also report "number of coalesced partitions" and for ShuffleExchange to report "number of partitions"
This way it's clear what happened on the map side and on the reduce side.

![Metrics](https://user-images.githubusercontent.com/4297661/126729820-cf01b3fa-7bc4-44a5-8098-91689766a68a.png)

### Why are the changes needed?

Improves usability

### Does this PR introduce _any_ user-facing change?

Yes, it now provides more information about `AQEShuffleReadExec` operator behavior in the metrics system.

### How was this patch tested?

Existing tests

Closes #32776 from ekoifman/PRISM-91635-customshufflereader-sql-metrics.

Authored-by: Eugene Koifman <eugene.koifman@workday.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-28 13:49:48 +08:00
allisonwang-db 23a6ffa5dc [SPARK-36275][SQL] ResolveAggregateFunctions should works with nested fields
### What changes were proposed in this pull request?
This PR fixes an issue in `ResolveAggregateFunctions` where non-aggregated nested fields in ORDER BY and HAVING are not resolved correctly. This is because nested fields are resolved as aliases that fail to be semantically equal to any grouping/aggregate expressions.

### Why are the changes needed?
To fix an analyzer issue.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit tests.

Closes #33498 from allisonwang-db/spark-36275-resolve-agg-func.

Authored-by: allisonwang-db <allison.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-28 13:35:17 +08:00
Huaxin Gao c8dd97d456 [SPARK-34952][SQL][FOLLOW-UP] DSv2 aggregate push down follow-up
### What changes were proposed in this pull request?
update java doc, JDBC data source doc, address follow up comments

### Why are the changes needed?
update doc and address follow up comments

### Does this PR introduce _any_ user-facing change?
Yes, add the new JDBC option `pushDownAggregate` in JDBC data source doc.

### How was this patch tested?
manually checked

Closes #33526 from huaxingao/aggPD_followup.

Authored-by: Huaxin Gao <huaxin_gao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-28 12:52:42 +08:00
Liang-Chi Hsieh 22ac98dcbf Revert "[SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package"
This reverts commit 634f96dde4.

Closes #33533 from viirya/revert-SPARK-36136.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-07-27 19:11:42 +09:00
Linhong Liu 8e7e14dc0d [SPARK-36241][SQL] Support creating tables with null column
### What changes were proposed in this pull request?
Previously we blocked creating tables with the null column to follow the hive behavior in PR #28833
In this PR, I propose the restore the previous behavior to support the null column in a table.

### Why are the changes needed?
For a complex query, it's possible to generate a column with null type. If this happens to the input query of
CTAS, the query will fail due to Spark doesn't allow creating a table with null type. From the user's perspective,
it’s hard to figure out why the null type column is produced in the complicated query and how to fix it. So removing
this constraint is more friendly to users.

### Does this PR introduce _any_ user-facing change?
Yes, this reverts the previous behavior change in #28833, for example, below command will success after this PR
```sql
CREATE TABLE t (col_1 void, col_2 int)
```

### How was this patch tested?
newly added and existing test cases

Closes #33488 from linhongliu-db/SPARK-36241-support-void-column.

Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-27 17:31:52 +08:00
Wenchen Fan 068f8d434a [SPARK-36247][SQL] Check string length for char/varchar and apply type coercion in UPDATE/MERGE command
### What changes were proposed in this pull request?

We added the char/varchar support in 3.1, but the string length check is only applied to INSERT, not UPDATE/MERGE. This PR fixes it. This PR also adds the missing type coercion for UPDATE/MERGE.

### Why are the changes needed?

complete the char/varchar support and make UPDATE/MERGE easier to use by doing type coercion.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new UT. No built-in source support UPDATE/MERGE so end-to-end test is not applicable here.

Closes #33468 from cloud-fan/char.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-27 13:57:05 +08:00
Chao Sun 634f96dde4 [SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package
### What changes were proposed in this pull request?

Move both `PruneFileSourcePartitionsSuite` and `PrunePartitionSuiteBase` to the package `org.apache.spark.sql.execution.datasources`. Did a few refactoring to enable this.

### Why are the changes needed?

Currently both `PruneFileSourcePartitionsSuite` and `PrunePartitionSuiteBase` are in package `org.apache.spark.sql.hive.execution` which doesn't look correct as these tests are not specific to Hive. Therefore, it's better to move them into `org.apache.spark.sql.execution.datasources`, the same place where the rule `PruneFileSourcePartitions` is at.

### Does this PR introduce _any_ user-facing change?

No, it's just test refactoring.

### How was this patch tested?

Using existing tests:
```
build/sbt "sql/testOnly *PruneFileSourcePartitionsSuite"
```
and
```
build/sbt "hive/testOnly *PruneHiveTablePartitionsSuite"
```

Closes #33350 from sunchao/SPARK-36136-partitions-suite.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-07-26 13:03:50 -07:00
Hyukjin Kwon 6e3d404cec [SPARK-36217][SQL] Rename CustomShuffleReader and OptimizeLocalShuffleReader in AQE
### What changes were proposed in this pull request?

This PR proposes to rename:

- Rename `*Reader`/`*reader` to `*Read`/`*read` for rules and execution plan (user-facing doc/config name remain untouched)
  - `*ShuffleReaderExec` ->`*ShuffleReadExec`
  - `isLocalReader` -> `isLocalRead`
  - ...
- Rename `CustomShuffle*` prefix to `AQEShuffle*`
- Rename `OptimizeLocalShuffleReader` rule to `OptimizeShuffleWithLocalRead`

### Why are the changes needed?

There are multiple problems in the current naming:

- `CustomShuffle*` -> `AQEShuffle*`
    it sounds like it is a pluggable API. However, this is actually only used by AQE.
- `OptimizeLocalShuffleReader` -> `OptimizeShuffleWithLocalRead`
    it is the name of a rule but it can be misread as a reader, which is counterintuative
- `*ReaderExec` -> `*ReadExec`
    Reader execution reads a bit odd. It should better be read execution (like `ScanExec`, `ProjectExec` and `FilterExec`). I can't find the reason to name it with something that performs an action. See also the generated plans:

    Before:

    ```
    ...
    * HashAggregate (12)
       +- CustomShuffleReader (11)
          +- ShuffleQueryStage (10)
             +- Exchange (9)
    ...
    ```

    After:

    ```
    ...
    * HashAggregate (12)
       +- AQEShuffleRead (11)
          +- ShuffleQueryStage (10)
             +- Exchange (9)
    ..
    ```

### Does this PR introduce _any_ user-facing change?

No, internal refactoring.

### How was this patch tested?

Existing unittests should cover the changes.

Closes #33429 from HyukjinKwon/SPARK-36217.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-26 22:41:54 +08:00
Angerszhuuuu a63802f2c6 [SPARK-34402][SQL] Group exception about data format schema
### What changes were proposed in this pull request?
Group exception about data format schema of different format, orc/parquet

### Why are the changes needed?
group exception

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Not need

Closes #33296 from AngersZhuuuu/SPARK-34402.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-07-26 19:18:43 +08:00