Commit graph

31192 commits

Author SHA1 Message Date
Huaxin Gao b04330cd38 [SPARK-36454][SQL] Not push down partition filter to ORCScan for DSv2
### What changes were proposed in this pull request?
not push down partition filter to `ORCScan` for DSv2

### Why are the changes needed?
Seems to me that partition filter is only used for partition pruning and shouldn't be pushed down to `ORCScan`. We don't push down partition filter to ORCScan in DSv1
```
== Physical Plan ==
*(1) Filter (isnotnull(value#19) AND NOT (value#19 = a))
+- *(1) ColumnarToRow
   +- FileScan orc [value#19,p1#20,p2#21] Batched: true, DataFilters: [isnotnull(value#19), NOT (value#19 = a)], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/pt/_5f4sxy56x70dv9zpz032f0m0000gn/T/spark-c1..., PartitionFilters: [isnotnull(p1#20), isnotnull(p2#21), (p1#20 = 1), (p2#21 = 2)], PushedFilters: [IsNotNull(value), Not(EqualTo(value,a))], ReadSchema: struct<value:string>
```
Also, we don't push down partition filter for parquet in DSv2.
https://github.com/apache/spark/pull/30652

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing test suites

Closes #33680 from huaxingao/orc_filter.

Authored-by: Huaxin Gao <huaxin_gao@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-08-09 10:47:03 -07:00
Mick Jermsurawong 33c6d1168c [SPARK-20384][SQL] Support value class in nested schema for Dataset
### What changes were proposed in this pull request?

- This PR revisits https://github.com/apache/spark/pull/22309, and [SPARK-20384](https://issues.apache.org/jira/browse/SPARK-20384) solving the original problem, but additionally will prevent backward-compat break on schema of top-level `AnyVal` value class.
- Why previous break? We currently support top-level value classes just as any other case class; field of the underlying type is present in schema. This means any dataframe SQL filtering on this expects the field name to be present. The previous PR changes this schema and would result in breaking current usage. See test `"schema for case class that is a value class"`. This PR keeps the schema.
- We actually currently support collection of value classes prior to this change, but not case class of nested value class. This means the schema of these classes shouldn't change to prevent breaking too.
- However, what we can change, without breaking, is schema of nested value class, which will fails due to the compile problem, and thus its schema now isn't actually valid. After the change, the schema of this nested value class is now flattened
- With this PR, there's flattening only for nested value class (new), but not for top-level and collection classes (existing behavior)
- This PR revisits https://github.com/apache/spark/pull/27153 by handling tuple `Tuple2[AnyVal, AnyVal]` which is a constructor ("nested class") but is a generic type, so it should not be flattened behaving similarly to `Seq[AnyVal]`

### Why are the changes needed?

- Currently, nested value class isn't supported. This is because when the generated code treats `anyVal` class in its unwrapped form, but we encode the type to be the wrapped case class. This results in compile of generated code
For example,
For a given `AnyVal` wrapper and its root-level class container
```
case class IntWrapper(i: Int) extends AnyVal
case class ComplexValueClassContainer(c: IntWrapper)
```
The problematic part of generated code:
```
    private InternalRow If_1(InternalRow i) {
        boolean isNull_42 = i.isNullAt(0);
        // 1) ******** The root-level case class we care
        org.apache.spark.sql.catalyst.encoders.ComplexValueClassContainer value_46 = isNull_42 ?
            null : ((org.apache.spark.sql.catalyst.encoders.ComplexValueClassContainer) i.get(0, null));
        if (isNull_42) {
            throw new NullPointerException(((java.lang.String) references[5] /* errMsg */ ));
        }
        boolean isNull_39 = true;
        // 2) ******** We specify its member to be unwrapped case class extending `AnyVal`
        org.apache.spark.sql.catalyst.encoders.IntWrapper value_43 = null;
        if (!false) {

            isNull_39 = false;
            if (!isNull_39) {
                // 3) ******** ERROR: `c()` compiled however is of type `int` and thus we see error
                value_43 = value_46.c();
            }
        }
```
We get this errror: Assignment conversion not possible from type "int" to type "org.apache.spark.sql.catalyst.encoders.IntWrapper"
```
java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException:
File 'generated.java', Line 159, Column 1: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 159, Column 1: Assignment conversion not possible from type "int" to type "org.apache.spark.sql.catalyst.encoders.IntWrapper"
```

From [doc](https://docs.scala-lang.org/overviews/core/value-classes.html) on value class: , Given: `class Wrapper(val underlying: Int) extends AnyVal`,
1) "The type at compile time is `Wrapper`, but at runtime, the representation is an `Int`". This implies that when our struct has a field of value class, the generated code should support the underlying type during runtime execution.
2) `Wrapper` "must be instantiated... when a value class is used as a type argument". This implies that `scala.Tuple[Wrapper, ...], Seq[Wrapper], Map[String, Wrapper], Option[Wrapper]` will still contain Wrapper as-is in during runtime instead of `Int`.

### Does this PR introduce _any_ user-facing change?

- Yes, this will allow support for the nested value class.

### How was this patch tested?

- Added unit tests to illustrate
  - raw schema
  - projection
  - round-trip encode/decode

Closes #33205 from mickjermsurawong-stripe/SPARK-20384-2.

Lead-authored-by: Mick Jermsurawong <mickjermsurawong@stripe.com>
Co-authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-08-09 08:47:35 -05:00
Jungtaek Lim 004b87d9a1 [SPARK-36455][SS] Provide an example of complex session window via flatMapGroupsWithState
### What changes were proposed in this pull request?

This PR proposes to add a new example of complex sessionization, which leverages flatMapGroupsWithState.

### Why are the changes needed?

We have replaced an example of sessionization from flatMapGroupsWithState to native support of session window. Given there are still use cases on sessionization which native support of session window cannot cover, it would be nice if we can demonstrate such case. It will also be used as an example of flatMapGroupsWithState.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually tested. Example data is given in class doc.

Closes #33681 from HeartSaVioR/SPARK-36455.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-08-09 19:39:49 +09:00
yangjie01 900908b9be [SPARK-36410][CORE][SQL][STRUCTURED STREAMING][EXAMPLES] Replace anonymous classes with lambda expressions
### What changes were proposed in this pull request?
The main change of this pr is replace anonymous classes with lambda expressions in Java code

**Before**
```java
 new Thread(new Runnable() {
    Override
    public void run() {
      // run thread
    }
  });
```

**After**

```java
new Thread(() -> {
    // run thread
  });
```

### Why are the changes needed?
Code Simpilefications.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass the Jenkins or GitHub Action
- Manually test `JavaUserDefinedScalar` with command
   `bin/spark-submit run-example org.apache.spark.examples.sql.JavaUserDefinedScalar` passed

Closes #33635 from LuciferYang/lambda.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-09 19:28:31 +09:00
Wenchen Fan 9a539d5846 [SPARK-36430][SQL] Adaptively calculate the target size when coalescing shuffle partitions in AQE
### What changes were proposed in this pull request?

This PR fixes a performance regression introduced in https://github.com/apache/spark/pull/33172

Before #33172 , the target size is adaptively calculated based on the default parallelism of the spark cluster. Sometimes it's very small and #33172 sets a min partition size to fix perf issues. Sometimes the calculated size is reasonable, such as dozens of MBs.

After #33172 , we no longer calculate the target size adaptively, and by default always coalesce the partitions into 1 MB. This can cause perf regression if the adaptively calculated size is reasonable.

This PR brings back the code that adaptively calculate the target size based on the default parallelism of the spark cluster.

### Why are the changes needed?

fix perf regression

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #33655 from cloud-fan/minor.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-09 17:25:55 +08:00
Angerszhuuuu f3e079b09b [SPARK-36271][SQL] Unify V1 insert check field name before prepare writter
### What changes were proposed in this pull request?
Unify DataSource V1 insert schema check field name before prepare writer.
And in this PR we add check for avro V1 insert too.

### Why are the changes needed?
Unify code and add check for avro V1 insert too.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #33566 from AngersZhuuuu/SPARK-36271.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-09 17:18:06 +08:00
ulysses-you bb6f65acca [SPARK-36424][SQL] Support eliminate limits in AQE Optimizer
### What changes were proposed in this pull request?

* override the maxRows method in `LogicalQueryStage`
* add rule `EliminateLimits` in `AQEOptimizer`

### Why are the changes needed?

In Ad-hoc scenario, we always add limit for the query if user have no special limit value, but not all limit is nesessary.

With the power of AQE, we can eliminate limits using running statistics.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

add test

Closes #33651 from ulysses-you/SPARK-36424.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-09 16:51:51 +08:00
Angerszhuuuu e051a540a1 [SPARK-36352][SQL] Spark should check result plan's output schema name
### What changes were proposed in this pull request?
Spark should check result plan's output schema name

### Why are the changes needed?
In current code, some optimizer rule may change plan's output schema, since in the code we always use semantic equal to check output, but it may change the plan's output schema.
For example, for SchemaPruning, if we have a plan
```
Project[a, B]
|--Scan[A, b, c]
```
the origin output schema is `a, B`, after SchemaPruning. it become
```
Project[A, b]
|--Scan[A, b]
```
It change the plan's schema. when we use CTAS, the schema is same as query plan's output.
Then since we change the schema, it not consistent with origin SQL. So we need to check final result plan's schema with origin plan's schema

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existed UT

Closes #33583 from AngersZhuuuu/SPARK-36352.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-09 16:47:56 +08:00
Wenchen Fan 8714eefe6f [SPARK-35881][SQL][FOLLOWUP] Add a boolean flag in AdaptiveSparkPlanExec to ask for columnar output
### What changes were proposed in this pull request?

This is a follow-up of https://github.com/apache/spark/pull/33140 to propose a simpler idea for integrating columnar execution into AQE.

Instead of making the `ColumnarToRowExec` and `RowToColumnarExec` dynamic to handle `AdaptiveSparkPlanExec`, it's simpler to let the consumer decide if it needs columnar output or not, and pass a boolean flag to `AdaptiveSparkPlanExec`.

For Spark vendors, they can set the flag differently in their custom columnar parquet writing command when the input plan is `AdaptiveSparkPlanExec`.

One argument is if we need to look at the final plan of AQE and consume the data differently (either row or columnar format). I can't think of a use case and I think we can always statically know if the AQE plan should output row or columnar data.

### Why are the changes needed?

code simplification.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

manual test

Closes #33624 from cloud-fan/aqe.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-09 16:33:52 +08:00
Terry Kim b3d7ebb2df [SPARK-36450][SQL] Remove unused UnresolvedV2Relation
### What changes were proposed in this pull request?

Now that all the commands that use `UnresolvedV2Relation` have been migrated to use `UnresolvedTable` and `UnresolvedView` (e.g, #33200), `UnresolvedV2Relation` can be removed.

### Why are the changes needed?

To remove unused code.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Removing dead code and no code coverage existed before.

Closes #33677 from imback82/remove_unresolvedv2relation.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-09 16:27:45 +08:00
itholic a9f371c247 [SPARK-36369][PYTHON] Fix Index.union to follow pandas 1.3
### What changes were proposed in this pull request?

This PR proposes fixing the `Index.union` to follow the behavior of pandas 1.3.

Before:
```python
>>> ps_idx1 = ps.Index([1, 1, 1, 1, 1, 2, 2])
>>> ps_idx2 = ps.Index([1, 1, 2, 2, 2, 2, 2])
>>> ps_idx1.union(ps_idx2)
Int64Index([1, 1, 1, 1, 1, 2, 2], dtype='int64')
```

After:
```python
>>> ps_idx1 = ps.Index([1, 1, 1, 1, 1, 2, 2])
>>> ps_idx2 = ps.Index([1, 1, 2, 2, 2, 2, 2])
>>> ps_idx1.union(ps_idx2)
Int64Index([1, 1, 1, 1, 1, 2, 2, 2, 2, 2], dtype='int64')
```

This bug is fixed in https://github.com/pandas-dev/pandas/issues/36289.

### Why are the changes needed?

We should follow the behavior of pandas as much as possible.

### Does this PR introduce _any_ user-facing change?

Yes, the result for some cases have duplicates values will change.

### How was this patch tested?

Unit test.

Closes #33634 from itholic/SPARK-36369.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-09 11:10:01 +09:00
Sajith Ariyarathna 5a22f9ceaf [SPARK-36432][BUILD] Upgrade Jetty version to 9.4.43
### What changes were proposed in this pull request?
This PR upgrades Jetty version to `9.4.43.v20210629`.

### Why are the changes needed?
To address vulnerability https://nvd.nist.gov/vuln/detail/CVE-2021-34429 which affects Jetty `9.4.42.v20210604`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
CI

Closes #33656 from this/upgrade-jetty-9.4.43.

Lead-authored-by: Sajith Ariyarathna <sajith.janaprasad@gmail.com>
Co-authored-by: Sajith Ariyarathna <this@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-09 10:14:07 +09:00
Weichen Xu f9f6c0d350 [SPARK-36425][PYSPARK][ML] Support CrossValidatorModel get standard deviation of metrics for each paramMap
Signed-off-by: Weichen Xu <weichen.xudatabricks.com>

### What changes were proposed in this pull request?
Support CrossValidatorModel get standard deviation of metrics for each paramMap.

### Why are the changes needed?
So that in mlflow autologging, we can log standard deviation of metrics which is useful.

### Does this PR introduce _any_ user-facing change?
Yes.
`CrossValidatorModel` add a public attribute `stdMetrics` which are the standard deviation of metrics for each paramMap

### How was this patch tested?
Unit test.

Closes #33652 from WeichenXu123/add_std_metric.

Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-09 10:08:52 +09:00
Yuming Wang 4624e59ac6 [SPARK-36359][SQL] Coalesce drop all expressions after the first non nullable expression
### What changes were proposed in this pull request?

`Coalesce` drop all expressions after the first non nullable expression. For example:
```scala
sql("create table t1(a string, b string) using parquet")
sql("select a, Coalesce(count(b), 0) from t1 group by a").explain(true)
```

Before this pr:
```
== Optimized Logical Plan ==
Aggregate [a#0], [a#0, coalesce(count(b#1), 0) AS coalesce(count(b), 0)#3L]
+- Relation default.t1[a#0,b#1] parquet
```
After this pr:
```
== Optimized Logical Plan ==
Aggregate [a#0], [a#0, count(b#1) AS coalesce(count(b), 0)#3L]
+- Relation default.t1[a#0,b#1] parquet
```

### Why are the changes needed?

Improve query performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #33590 from wangyum/SPARK-36359.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-08-06 23:54:24 +08:00
Min Shen 6e729515fd [SPARK-36423][SHUFFLE] Randomize order of blocks in a push request to improve block merge ratio for push-based shuffle
### What changes were proposed in this pull request?

On the client side, we are currently randomizing the order of push requests before processing each request. In addition we can further randomize the order of blocks within each push request before pushing them.
In our benchmark, this has resulted in a 60%-70% reduction of blocks that fail to be merged due to bock collision (the existing block merge ratio is already pretty good in general, and this further improves it).

### Why are the changes needed?

Improve block merge ratio for push-based shuffle

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Straightforward small change, no additional test needed.

Closes #33649 from Victsm/SPARK-36423.

Lead-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Min Shen <victor.nju@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-08-06 09:47:42 -05:00
Yuto Akutsu 41b011e416 [SPARK-595][DOCS] Add local-cluster mode option in Documentation
### What changes were proposed in this pull request?

Add local-cluster mode option to submitting-applications.md

### Why are the changes needed?

Help users to find/use this option for unit tests.

### Does this PR introduce _any_ user-facing change?

Yes, docs changed.

### How was this patch tested?

`SKIP_API=1 bundle exec jekyll build`
<img width="460" alt="docchange" src="https://user-images.githubusercontent.com/87687356/127125380-6beb4601-7cf4-4876-b2c6-459454ce2a02.png">

Closes #33537 from yutoacts/SPARK-595.

Lead-authored-by: Yuto Akutsu <yuto.akutsu@jp.nttdata.com>
Co-authored-by: Yuto Akutsu <yuto.akutsu@nttdata.com>
Co-authored-by: Yuto Akutsu <87687356+yutoacts@users.noreply.github.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
2021-08-06 09:26:13 -05:00
Kousuke Saruta e17612d0bf Revert "[SPARK-36429][SQL] JacksonParser should throw exception when data type unsupported"
### What changes were proposed in this pull request?

This PR reverts the change in SPARK-36429 (#33654).
See [conversation](https://github.com/apache/spark/pull/33654#issuecomment-894160037).

### Why are the changes needed?

To recover CIs.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

N/A

Closes #33670 from sarutak/revert-SPARK-36429.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2021-08-06 20:56:24 +09:00
gaoyajun02 888f8f03c8 [SPARK-36339][SQL] References to grouping that not part of aggregation should be replaced
### What changes were proposed in this pull request?

Currently, references to grouping sets are reported as errors after aggregated expressions, e.g.
```
SELECT count(name) c, name
FROM VALUES ('Alice'), ('Bob') people(name)
GROUP BY name GROUPING SETS(name);
```
Error in query: expression 'people.`name`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;

### Why are the changes needed?

Fix the map anonymous function in the constructAggregateExprs function does not use underscores to avoid

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit tests.

Closes #33574 from gaoyajun02/SPARK-36339.

Lead-authored-by: gaoyajun02 <gaoyajun02@gmail.com>
Co-authored-by: gaoyajun02 <gaoyajun02@meituan.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-06 16:34:37 +08:00
dgd-contributor 7bb53b85f0 [SPARK-36098][CORE] Grouping exception in core/storage
### What changes were proposed in this pull request?
This PR group exception messages in core/src/main/scala/org/apache/spark/storage

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #33530 from dgd-contributor/SPARK_36098_group_exception_core_storage.

Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-06 16:17:53 +08:00
ulysses-you c97fb68885 [SPARK-35221][SQL] Add the check of supported join hints
### What changes were proposed in this pull request?

Print warning msg if join hint is not supported for the specified build side.

### Why are the changes needed?

Currently we support specify the join implementation with hint, but Spark did not promise it.

For example broadcast outer join and hash outer join we need to check if its build side was supported. And at least we should print some warning log instead of changing to other join implementation silently.

### Does this PR introduce _any_ user-facing change?

Yes, warning log might be printed.

### How was this patch tested?

Add new test.

Closes #32355 from ulysses-you/SPARK-35221.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-06 15:29:43 +08:00
Kousuke Saruta 63c7d1847d [SPARK-36429][SQL][FOLLOWUP] Update a golden file to comply with the change in SPARK-36429
### What changes were proposed in this pull request?

This PR updates a golden to comply with the change in SPARK-36429 (#33654).

### Why are the changes needed?

To recover GA failure.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

GA itself.

Closes #33663 from sarutak/followup-SPARK-36429.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-06 15:20:54 +08:00
gengjiaan eb12727bc7 [SPARK-36429][SQL] JacksonParser should throw exception when data type unsupported
### What changes were proposed in this pull request?
Currently, when `set spark.sql.timestampType=TIMESTAMP_NTZ`, the behavior is different between `from_json` and `from_csv`.
```
-- !query
select from_json('{"t":"26/October/2015"}', 't Timestamp', map('timestampFormat', 'dd/MMMMM/yyyy'))
-- !query schema
struct<from_json({"t":"26/October/2015"}):struct<t:timestamp_ntz>>
-- !query output
{"t":null}
```

```
-- !query
select from_csv('26/October/2015', 't Timestamp', map('timestampFormat', 'dd/MMMMM/yyyy'))
-- !query schema
struct<>
-- !query output
java.lang.Exception
Unsupported type: timestamp_ntz
```

We should make `from_json` throws exception too.
This PR fix the discussion below
https://github.com/apache/spark/pull/33640#discussion_r682862523

### Why are the changes needed?
Make the behavior of `from_json` more reasonable.

### Does this PR introduce _any_ user-facing change?
'Yes'.
from_json throwing Exception when we set spark.sql.timestampType=TIMESTAMP_NTZ.

### How was this patch tested?
Tests updated.

Closes #33654 from beliefer/SPARK-36429.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-06 12:53:04 +08:00
Gengliang Wang 8a35243fa7 [SPARK-36415][SQL][DOCS] Add docs for try_cast/try_add/try_divide
### What changes were proposed in this pull request?

Add documentation for new functions try_cast/try_add/try_divide

### Why are the changes needed?

Better documentation. These new functions are useful when migrating to the ANSI dialect.

### Does this PR introduce _any_ user-facing change?

No
### How was this patch tested?

Build docs and preview:
![image](https://user-images.githubusercontent.com/1097932/128209312-34a6cc6a-a73d-4aed-8646-22b1cb7ce702.png)

Closes #33638 from gengliangwang/addDocForTry.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-06 12:32:57 +09:00
xiepengjie 8230a2a700 [SPARK-36420][GRAPHX] Use isEmpty to improve performance in Pregel‘s superstep
### What changes were proposed in this pull request?

When recived active-messages in Pregel,  we only need an action operator here and active-messages are not empty, so we don’t need to use count, it’s better to use isEmpty.

### Why are the changes needed?

In the case of 10+ billions of vertices and edges, it will make the speed faster.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass existed tests.

Closes #33648 from StefanXiepj/SPARK-36420.

Authored-by: xiepengjie <xiepengjie@kuaishou.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-06 12:20:47 +09:00
Liang-Chi Hsieh cd070f1b9c [SPARK-36393][BUILD][FOLLOW-UP] Try to raise memory for GHA
### What changes were proposed in this pull request?

As followup to raise memory for two places forgotten.

### Why are the changes needed?

Raise memory for GHA.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

GA

Closes #33658 from viirya/increasing-mem-ga-followup.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-08-05 20:09:30 -07:00
Kent Yao c7fa3c9090 [SPARK-36421][SQL][DOCS] Use ConfigEntry.key to fix docs and set command results
### What changes were proposed in this pull request?

This PR fixes the issue that `ConfigEntry` to be introduced to the doc field directly without calling `.key`, which causes malformed documents on the web site and in the result of `SET -v`

1. https://spark.apache.org/docs/3.1.2/configuration.html#static-sql-configuration - spark.sql.hive.metastore.jars

2. set -v
![image](https://user-images.githubusercontent.com/8326978/128292412-85100f95-24fd-4b40-a14f-d31a256dab7d.png)

### Why are the changes needed?

bugfix

### Does this PR introduce _any_ user-facing change?

no, but contains doc fix
### How was this patch tested?

new tests

Closes #33647 from yaooqinn/SPARK-36421.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-06 11:01:47 +09:00
Kousuke Saruta 856c9a58f8 [SPARK-36173][CORE][PYTHON][FOLLOWUP] Add type hint for TaskContext.cpus
### What changes were proposed in this pull request?

This PR adds type hint for `TaskContext.cpus` added in SPARK-36173 (#33385)

### Why are the changes needed?

To comply with Project Zen.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Confirmed typehint works with IntelliJ IDEA.

Closes #33645 from sarutak/taskcontext-pyi.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-06 10:56:10 +09:00
Kousuke Saruta e13bd586f1 [SPARK-36441][INFRA] Fix GA failure related to downloading lintr dependencies
### What changes were proposed in this pull request?

This PR fixes a GA failure which is related to downloading lintr dependencies.
```
 * installing *source* package ‘devtools’ ...
** package ‘devtools’ successfully unpacked and MD5 sums checked
** using staged installation
** R
** inst
** byte-compile and prepare package for lazy loading
** help
*** installing help indices
*** copying figures
** building package indices
** installing vignettes
** testing if installed package can be loaded from temporary location
** testing if installed package can be loaded from final location
** testing if installed package keeps a record of temporary installation path
* DONE (devtools)

The downloaded source packages are in
	‘/tmp/Rtmpv53Ix4/downloaded_packages’
Using bundled GitHub PAT. Please add your own PAT to the env var `GITHUB_PAT`
Error: Failed to install 'unknown package' from GitHub:
  HTTP error 401.
  Bad credentials
```

I re-triggered the GA job but it still fail with the same error.
https://github.com/apache/spark/runs/3257853825

The issue seems to happen when downloading lintr dependencies from GitHub. So, the solution is to change the way to download them.

### Why are the changes needed?

To recover GA.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

GA itself.

Closes #33660 from sarutak/fix-r-package-issue.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-06 10:49:27 +09:00
Kent Yao 0c94e47aec [SPARK-36414][SQL] Disable timeout for BroadcastQueryStageExec in AQE
### What changes were proposed in this pull request?

This reverts SPARK-31475, as there are always more concurrent jobs running in AQE mode, especially when running multiple queries at the same time. Currently, the broadcast timeout does not record accurately for the BroadcastQueryStageExec only, but also including the time waiting for being scheduled. If all the resources are currently being occupied for materializing other stages, it timeouts without a chance to run actually.

 

![image](https://user-images.githubusercontent.com/8326978/128169612-4c96c8f6-6f8e-48ed-8eaf-450f87982c3b.png)

 

The default value is 300s, and it's hard to adjust the timeout for AQE mode. Usually, you need an extremely large number for real-world cases. As you can see in the example, above, the timeout we used for it was 1800s, and obviously, it needed 3x more or something

 

### Why are the changes needed?

AQE is default now, we can make it more stable with this PR

### Does this PR introduce _any_ user-facing change?

yes, broadcast timeout now is not used for AQE

### How was this patch tested?

modified test

Closes #33636 from yaooqinn/SPARK-36414.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-05 21:15:35 +08:00
Wenchen Fan 095f9ff75b [SPARK-36409][SQL][TESTS] Splitting test cases from datetime.sql
### What changes were proposed in this pull request?

Currently `datetime.sql` contains a lot of tests and will be run 3 times: default mode, ansi mode, ntz mode. It wastes the test time and also large test files are hard to read.

This PR proposes to split it into smaller ones:
1. `date.sql`, which contains date literals, functions and operations. It will be run twice with default and ansi mode.
2. `timestamp.sql`, which contains timestamp (no ltz or ntz suffix) literals, functions and operations. It will be run 4 times: default mode + ans off, defaul mode + ansi on, ntz mode + ansi off, ntz mode + ansi on.
3. `datetime_special.sql`, which create datetime values whose year is outside of [0, 9999]. This is a separated file as JDBC doesn't support them and need to ignore this test file. It will be run 4 times as well.
4. `timestamp_ltz.sql`, which contains timestamp_ltz literals and constructors. It will be run twice with default and ntz mode, to make sure its result doesn't change with the timestamp mode. Note that, operations with ltz are tested by `timestamp.sql`
5. `timestamp_ntz.sql`, which contains timestamp_ntz literals and constructors. It will be run twice with default and ntz mode, to make sure its result doesn't change with the timestamp mode. Note that, operations with ntz are tested by `timestamp.sql`

### Why are the changes needed?

reduce test run time.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

Closes #33640 from cloud-fan/test.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-08-05 20:55:32 +08:00
Angerszhuuuu 02810eecbf [SPARK-36353][SQL] RemoveNoopOperators should keep output schema
### What changes were proposed in this pull request?
 RemoveNoopOperators should keep output schema

### Why are the changes needed?
Expand function

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Not need

Closes #33587 from AngersZhuuuu/SPARK-36355.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-05 20:43:35 +08:00
Liang-Chi Hsieh 7d13ac177b [SPARK-36393][BUILD] Try to raise memory for GHA
### What changes were proposed in this pull request?

According to the feedback from GitHub, the change causing memory issue has been rolled back. We can try to raise memory again for GA.

### Why are the changes needed?

Trying higher memory settings for GA. It could speed up the testing time.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

GA

Closes #33623 from viirya/increasing-mem-ga.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-08-05 01:31:35 -07:00
Angerszhuuuu b377ea26e2 [SPARK-36391][SHUFFLE] When state is remove will throw NPE, and we should improve the error message
### What changes were proposed in this pull request?
When channel terminated will call `connectionTerminated` and remove corresponding StreamState,
then all coming request on this StreamState will throw NPE like
```
2021-07-31 22:00:24,810 ERROR server.ChunkFetchRequestHandler (ChunkFetchRequestHandler.java:lambda$respond$1(146)) - Error sending result ChunkFetchFailure[streamChunkId=StreamChunkId[streamId=1119950114515,chunkIndex=0],errorString=java.lang.NullPointerException
	at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:80)
	at org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:101)
	at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:82)
	at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
	at org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
	at org.sparkproject.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
] to /ip:53818; closing connection
java.nio.channels.ClosedChannelException
	at org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
	at org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
	at org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
	at org.sparkproject.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:112)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
	at org.sparkproject.io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
	at org.sparkproject.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
	at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
```

Since JVM will not show stack of NPE exception if it happen many times.
```
021-07-28 08:25:44,720 ERROR server.ChunkFetchRequestHandler (ChunkFetchRequestHandler.java:lambda$respond$1(146)) - Error sending result ChunkFetchFailure[streamChunkId=StreamChunkId[streamId=1187623335353,chunkIndex=11],errorString=java.lang.NullPoint
erException
] to /10.130.10.5:42148; closing connection
java.nio.channels.ClosedChannelException
```
Makes user confused.
We should improved this error message?

### Why are the changes needed?
Improve error message

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

Closes #33622 from AngersZhuuuu/SPARK-36391.

Lead-authored-by: Angerszhuuuu <angers.zhu@gmaihu@gmail.com>
Co-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
2021-08-05 15:30:00 +08:00
Wu, Xiaochang f6e6d1157a [SPARK-36173][CORE] Support getting CPU number in TaskContext
In stage-level resource scheduling, the allocated 3rd party resources can be obtained in TaskContext using resources() interface, however there is no API to get how many cpus are allocated for the task. Will add a cpus() interface to TaskContext to complement resources(). Althrough the task cpu requests can be got from profile, it's more convenient to get it inside the task code without the need to pass profile from driver side to the executor side.

### What changes were proposed in this pull request?
Add cpus() interface in TaskContext and modify relevant code.

### Why are the changes needed?
TaskContext has resources() to get 3rd party resources allocated. the is no API to get CPU allocated for the task.

### Does this PR introduce _any_ user-facing change?
Add cpus() interface for TaskContext

### How was this patch tested?
Unit tests

Closes #33385 from xwu99/taskcontext-cpus.

Lead-authored-by: Wu, Xiaochang <xiaochang.wu@intel.com>
Co-authored-by: Xiaochang Wu <xiaochang.wu@intel.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-08-04 21:14:01 -05:00
yi.wu 3b92c721b5 [SPARK-36384][CORE][DOC] Add doc for shuffle checksum
### What changes were proposed in this pull request?

Add doc for the shuffle checksum configs in `configuration.md`.

### Why are the changes needed?

doc

### Does this PR introduce _any_ user-facing change?

No, since Spark 3.2 hasn't been released.

### How was this patch tested?

Pass existed tests.

Closes #33637 from Ngone51/SPARK-36384.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-05 10:16:46 +09:00
Kousuke Saruta 0f5c3a4fd6 [SPARK-36068][BUILD][TEST] No tests in hadoop-cloud run unless hadoop-3.2 profile is activated explicitly
### What changes were proposed in this pull request?

This PR fixes an issue that no tests in `hadoop-cloud` are compiled and run unless `hadoop-3.2` profile is activated explicitly.
The root cause seems similar to SPARK-36067 (#33276) so the solution is to activate `hadoop-3.2` profile in `hadoop-cloud/pom.xml` by default.

This PR introduced an empty profile for `hadoop-2.7`. Without this, building with `hadoop-2.7` fails.

### Why are the changes needed?

`hadoop-3.2` profile should be activated by default so tests in `hadoop-cloud` also should be compiled and run without activating `hadoop-3.2` profile explicitly.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Confirmed tests in `hadoop-cloud` ran with both SBT and Maven.
```
build/sbt -Phadoop-cloud "hadoop-cloud/test"
...
[info] CommitterBindingSuite:
[info] - BindingParquetOutputCommitter binds to the inner committer (258 milliseconds)
[info] - committer protocol can be serialized and deserialized (11 milliseconds)
[info] - local filesystem instantiation (3 milliseconds)
[info] - reject dynamic partitioning (1 millisecond)
[info] Run completed in 1 second, 234 milliseconds.
[info] Total number of tests run: 4
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

build/mvn -Phadoop-cloud -pl hadoop-cloud test
...
CommitterBindingSuite:
- BindingParquetOutputCommitter binds to the inner committer
- committer protocol can be serialized and deserialized
- local filesystem instantiation
- reject dynamic partitioning
Run completed in 560 milliseconds.
Total number of tests run: 4
Suites: completed 2, aborted 0
Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

I also confirmed building with `-Phadoop-2.7` successfully finishes with both SBT and Maven.
```
build/sbt -Phadoop-cloud -Phadoop-2.7 "hadoop-cloud/Test/compile"
build/mvn -Phadoop-cloud -Phadoop-2.7 -pl hadoop-cloud testCompile
```

Closes #33277 from sarutak/fix-hadoop-3.2-cloud.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-05 09:39:28 +09:00
yangjie01 01cf6f4c6b [SPARK-34309][BUILD][CORE][SQL][K8S] Use Caffeine instead of Guava Cache
### What changes were proposed in this pull request?
There are 3 ways to use Guava cache in spark code:

1. `Loadingcache` is the main way to use Guava cache in spark code and the key usages are as follows:
  a. `LoadingCache` with `maximumsize` data eviction policy, such as `appCache` in `ApplicationCache`, `cache` in `Codegenerator`
  b. `LoadingCache` with `maximumWeight` data eviction policy, such as `shuffleIndexCache` in `ExternalShuffleBlockResolver`
  c. `LoadingCache` with 'expireAfterWrite' data eviction policy, such as `tableRelationCache` in `SessionCatalog`
2. `ManualCache` is another way to use Guava cache in spark code and the key usage is `cache` in `SharedInMemoryCache`, it use to caches partition file statuses in memory

3. The last use way is `hadoopJobMetadata` in `SparkEnv`, it uses Guava Cache to build a `soft-reference map`.

The goal of this pr is use `Caffeine` instead of `Guava Cache` because `Caffeine` is faster than `Guava Cache` from benchmarks, the main changes as follows:

1. Add `Caffeine` deps to maven `pom.xml`

2. Use `Caffeine` instead of Guava `LoadingCache`, `ManualCache` and soft-reference map in `SparkEnv`

3. Add `LocalCacheBenchmark` to compare performance of `Loadingcache` between `Guava Cache` and `Caffeine`

### Why are the changes needed?
`Caffeine` is faster than `Guava Cache` from benchmarks

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass the Jenkins or GitHub Action
- Add `LocalCacheBenchmark` to compare performance of `Loadingcache` between `Guava Cache` and `Caffeine`

Closes #31517 from LuciferYang/guava-cache-to-caffeine.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Holden Karau <hkarau@netflix.com>
2021-08-04 12:01:44 -07:00
Dongjoon Hyun 28a2a2238f [SPARK-36354][CORE] EventLogFileReader should skip rolling event log directories with no logs
### What changes were proposed in this pull request?

This PR aims to skip rolling event log directories which has only `appstatus` file.

### Why are the changes needed?

Currently, Spark History server shows `IllegalArgumentException` warning, but the event log might arrive later. The situation also can happen when the job is killed before uploading its first log to the remote storages like S3.
```
21/07/30 07:38:26 WARN FsHistoryProvider:
Error while reading new log s3a://.../eventlog_v2_spark-95b5c736c8e44037afcf152534d08771
java.lang.IllegalArgumentException: requirement failed:
Log directory must contain at least one event log file!
...
at org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files$lzycompute(EventLogFileReaders.scala:216)
```

### Does this PR introduce _any_ user-facing change?

Yes. Users will not see `IllegalArgumentException` warnings.

### How was this patch tested?

Pass the CIs with the newly added test case.

Closes #33586 from dongjoon-hyun/SPARK-36354.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-08-04 20:26:06 +09:00
Venkata krishnan Sowrirajan d8169493b6 [SPARK-32923][FOLLOW-UP] Clean up older shuffleMergeId shuffle files when finalize request for higher shuffleMergeId is received
### What changes were proposed in this pull request?

Clean up older shuffleMergeId shuffle files when finalize request for higher shuffleMergeId is received when no blocks pushed for the corresponding shuffleMergeId. This is identified as part of https://github.com/apache/spark/pull/33034#discussion_r680610872.

### Why are the changes needed?

Without this change, older shuffleMergeId files won't be cleaned up properly.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added changes to existing unit test to address this case.

Closes #33605 from venkata91/SPARK-32923-follow-on.

Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-08-04 03:30:08 -05:00
itholic 3d72c20e64 [SPARK-35811][PYTHON][FOLLOWUP] Deprecate DataFrame.to_spark_io
### What changes were proposed in this pull request?

This PR is followup for https://github.com/apache/spark/pull/32964, to improve the warning message.

### Why are the changes needed?

To improve the warning message.

### Does this PR introduce _any_ user-facing change?

The warning is changed from "Deprecated in 3.2, Use `spark.to_spark_io` instead." to "Deprecated in 3.2, Use `DataFrame.spark.to_spark_io` instead."

### How was this patch tested?

Manually run `dev/lint-python`

Closes #33631 from itholic/SPARK-35811-followup.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-04 16:20:29 +09:00
Cheng Su de62b5ae32 [SPARK-36404][SQL] Support ORC nested column vectorized reader for data source v2
### What changes were proposed in this pull request?

We added support of nested columns in ORC vectorized reader for data source v1. Data source v2 and v1 both use same underlying implementation for vectorized reader (OrcColumnVector), so we can support data source v2 as well.

### Why are the changes needed?

Improve query performance for ORC data source v2 when reading nested columns.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added test in `OrcQuerySuite.scala`.

Closes #33626 from c21/orc-v2.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-08-03 19:55:14 -07:00
Kousuke Saruta c31b653806 [MINOR][DOC] Remove obsolete contributing-to-spark.md
### What changes were proposed in this pull request?

This PR removes obsolete `contributing-to-spark.md` which is not referenced from anywhere.

### Why are the changes needed?

Just clean up.

### Does this PR introduce _any_ user-facing change?

No. Users can't have access to contributing-to-spark.html unless they directly point to the URL.

### How was this patch tested?

Built the document and confirmed that this change doesn't affect the result.

Closes #33619 from sarutak/remove-obsolete-contribution-doc.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-04 10:19:24 +09:00
PengLei 87d49cbcb1 [SPARK-36381][SQL] Add case sensitive and case insensitive compare for checking column name exist when alter table
### What changes were proposed in this pull request?
Add the Resolver to `checkColumnNotExists` to check name exist in case sensitive.

### Why are the changes needed?
At now the resolver is `_ == _` of `findNestedField`  called by `checkColumnNotExists`
Add `alter.conf.resolver` to it.
[SPARK-36381](https://issues.apache.org/jira/browse/SPARK-36381)
### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add ut tests

Closes #33618 from Peng-Lei/sensitive-cloumn-name.

Authored-by: PengLei <peng.8lei@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-04 10:04:13 +09:00
Yuming Wang 4a6afb4875 [SPARK-36280][SQL] Remove redundant aliases after RewritePredicateSubquery
### What changes were proposed in this pull request?

Remove redundant aliases after `RewritePredicateSubquery`. For example:
```scala
sql("CREATE TABLE t1 USING parquet AS SELECT id AS a, id AS b, id AS c FROM range(10)")
sql("CREATE TABLE t2 USING parquet AS SELECT id AS x, id AS y FROM range(8)")
sql(
  """
    |SELECT *
    |FROM  t1
    |WHERE  a IN (SELECT x
    |  FROM  (SELECT x AS x,
    |           Rank() OVER (partition BY x ORDER BY Sum(y) DESC) AS ranking
    |    FROM   t2
    |    GROUP  BY x) tmp1
    |  WHERE  ranking <= 5)
    |""".stripMargin).explain
```
Before this PR:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [a#10L], [x#7L], LeftSemi, BuildRight, false
   :- FileScan parquet default.t1[a#10L,b#11L,c#12L]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=#68]
      +- Project [x#7L]
         +- Filter (ranking#8 <= 5)
            +- Window [rank(_w2#25L) windowspecdefinition(x#15L, _w2#25L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ranking#8], [x#15L], [_w2#25L DESC NULLS LAST]
               +- Sort [x#15L ASC NULLS FIRST, _w2#25L DESC NULLS LAST], false, 0
                  +- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=#62]
                     +- HashAggregate(keys=[x#15L], functions=[sum(y#16L)])
                        +- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=#59]
                           +- HashAggregate(keys=[x#15L], functions=[partial_sum(y#16L)])
                              +- FileScan parquet default.t2[x#15L,y#16L]
```

After this PR:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [a#10L], [x#15L], LeftSemi, BuildRight, false
   :- FileScan parquet default.t1[a#10L,b#11L,c#12L]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=#67]
      +- Project [x#15L]
         +- Filter (ranking#8 <= 5)
            +- Window [rank(_w2#25L) windowspecdefinition(x#15L, _w2#25L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ranking#8], [x#15L], [_w2#25L DESC NULLS LAST]
               +- Sort [x#15L ASC NULLS FIRST, _w2#25L DESC NULLS LAST], false, 0
                  +- HashAggregate(keys=[x#15L], functions=[sum(y#16L)])
                     +- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=#59]
                        +- HashAggregate(keys=[x#15L], functions=[partial_sum(y#16L)])
                           +- FileScan parquet default.t2[x#15L,y#16L]
```

### Why are the changes needed?

Reduce shuffle to improve query performance. This change can benefit TPC-DS q70.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #33509 from wangyum/SPARK-36280.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-08-03 13:56:59 -07:00
Max Gekk 67cbc93263 [SPARK-36349][SQL] Disallow ANSI intervals in file-based datasources
### What changes were proposed in this pull request?
In the PR, I propose to ban `YearMonthIntervalType` and `DayTimeIntervalType` at the analysis phase while creating a table using a built-in filed-based datasource or writing a dataset to such datasource. In particular, add the following case:
```scala
case _: DayTimeIntervalType | _: YearMonthIntervalType => false
```
to all methods that override either:
- V2 `FileTable.supportsDataType()`
- V1 `FileFormat.supportDataType()`

### Why are the changes needed?
To improve user experience with Spark SQL, and output a proper error message at the analysis phase.

### Does this PR introduce _any_ user-facing change?
Yes but ANSI interval types haven't released yet. So, for users this is new behavior.

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt -Phive-2.3 "test:testOnly *HiveOrcSourceSuite"
```

Closes #33580 from MaxGekk/interval-ban-in-ds.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-08-03 20:30:20 +03:00
Kousuke Saruta 92cdb17d1a [SPARK-35815][SQL][FOLLOWUP] Add test considering the case spark.sql.legacy.interval.enabled is true
### What changes were proposed in this pull request?

This PR adds test considering the case `spark.sql.legacy.interval.enabled` is `true` for SPARK-35815.

### Why are the changes needed?

SPARK-35815 (#33456) changes `Dataset.withWatermark` to accept ANSI interval literals as `delayThreshold` but I noticed the change didn't work with `spark.sql.legacy.interval.enabled=true`.
We can't detect this issue because there is no test which considers the legacy interval type at that time.
In SPARK-36323 (#33551), this issue was resolved but it's better to add test.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #33606 from sarutak/test-watermark-with-legacy-interval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-08-03 13:48:41 +03:00
Wenchen Fan dd80457ffb [SPARK-36315][SQL] Only skip AQEShuffleReadRule in the final stage if it breaks the distribution requirement
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/30494

This PR proposes a new way to optimize the final query stage in AQE. We first collect the effective user-specified repartition (semantic-wise, user-specified repartition is only effective if it's the root node or under a few simple nodes), and get the required distribution for the final plan. When we optimize the final query stage, we skip certain `AQEShuffleReadRule` if it breaks the required distribution.

### Why are the changes needed?

The current solution for optimizing the final query stage is pretty hacky and overkill. As an example, the newly added rule `OptimizeSkewInRebalancePartitions` can hardly apply as it's very common that the query plan has shuffles with origin `ENSURE_REQUIREMENTS`, which is not supported by `OptimizeSkewInRebalancePartitions`.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

updated tests

Closes #33541 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-03 18:28:52 +08:00
gengjiaan 1deb386727 [SPARK-36175][SQL][FOLLOWUP] Improve the comments for AvroDeserializer/AvroSerializer
### What changes were proposed in this pull request?
This PR follows up https://github.com/apache/spark/pull/33413 and just improve the comments for `AvroDeserializer`/`AvroSerializer`.

### Why are the changes needed?
Make the comment more correctly.

### Does this PR introduce _any_ user-facing change?
'No'.
Just change the comments.

### How was this patch tested?
No need.

Closes #33607 from beliefer/SPARK-36175-followup.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-08-03 15:52:18 +08:00
Wenchen Fan 7cb9c1c241 [SPARK-36380][SQL] Simplify the logical plan names for ALTER TABLE ... COLUMN
### What changes were proposed in this pull request?

This a followup of the recent work such as https://github.com/apache/spark/pull/33200

For `ALTER TABLE` commands, the logical plans do not have the common `AlterTable` prefix in the name and just use names like `SetTableLocation`. This PR proposes to follow the same naming rule in `ALTER TABE ... COLUMN` commands.

This PR also moves these AlterTable commands to a individual file and give them a base trait.

### Why are the changes needed?

name simplification

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing test

Closes #33609 from cloud-fan/dsv2.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-08-03 10:43:00 +03:00
Xinrong Meng 8ca11fe39f [SPARK-36192][PYTHON] Better error messages for DataTypeOps against lists
### What changes were proposed in this pull request?
Better error messages for DataTypeOps against lists.

### Why are the changes needed?
Currently, DataTypeOps against lists throw a Py4JJavaError, we shall throw a TypeError with proper messages instead.

### Does this PR introduce _any_ user-facing change?
Yes. A TypeError message will be showed rather than a Py4JJavaError.

From:
```py
>>> import pyspark.pandas as ps
>>> ps.Series([1, 2, 3]) > [3, 2, 1]
Traceback (most recent call last):
...
py4j.protocol.Py4JJavaError: An error occurred while calling o107.gt.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [3, 2, 1]
...
```

To:
```py
>>> import pyspark.pandas as ps
>>> ps.Series([1, 2, 3]) > [3, 2, 1]
Traceback (most recent call last):
...
TypeError: The operation can not be applied to list.
```

### How was this patch tested?
Unit tests.

Closes #33581 from xinrong-databricks/data_type_ops_list.

Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-03 16:25:49 +09:00