Commit graph

30221 commits

Author SHA1 Message Date
yi.wu 34284c0649 [SPARK-35454][SQL] One LogicalPlan can match multiple dataset ids
### What changes were proposed in this pull request?

Change the type of `DATASET_ID_TAG` from `Long` to `HashSet[Long]` to allow the logical plan to match multiple datasets.

### Why are the changes needed?

During the transformation from one Dataset to another Dataset, the DATASET_ID_TAG of logical plan won't change if the plan itself doesn't change:

b5241c97b1/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala (L234-L237)

However, dataset id always changes even if the logical plan doesn't change:
b5241c97b1/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala (L207-L208)

And this can lead to the mismatch between dataset's id and col's __dataset_id. E.g.,

```scala
  test("SPARK-28344: fail ambiguous self join - Dataset.colRegex as column ref") {
    // The test can fail if we change it to:
    // val df1 = spark.range(3).toDF()
    // val df2 = df1.filter($"id" > 0).toDF()
    val df1 = spark.range(3)
    val df2 = df1.filter($"id" > 0)

    withSQLConf(
      SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true",
      SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
      assertAmbiguousSelfJoin(df1.join(df2, df1.colRegex("id") > df2.colRegex("id")))
    }
  }
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit tests.

Closes #32616 from Ngone51/fix-ambiguous-join.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-21 16:15:17 +08:00
ulysses-you 83737852f0 [SPARK-35063][SQL][FOLLOWUP] Fix scala 2.13 error
### What changes were proposed in this pull request?

### Why are the changes needed?

Fix scala compile error.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass GA

Closes #32617 from ulysses-you/scala2-13.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-21 07:59:26 +00:00
Max Gekk 2b08070e79 [SPARK-35427][SQL][TESTS] Check the EXCEPTION rebase mode for Avro/Parquet
### What changes were proposed in this pull request?
Add tests to check the `EXCEPTION` rebase mode explicitly in the datasources:
- Parquet: `DATE` type and `TIMESTAMP`: `INT96`, `TIMESTAMP_MICROS`, `TIMESTAMP_MILLIS`
- Avro: `DATE` type and `TIMESTAMP`: `timestamp-millis` and `timestamp-micros`.

### Why are the changes needed?
1. To improve test coverage
2. The `EXCEPTION` rebase mode should be checked independently from the default settings.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *AvroV2Suite"
$ build/sbt "test:testOnly *ParquetRebaseDatetimeV1Suite"
```

Closes #32574 from MaxGekk/test-rebase-exception.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-21 06:18:06 +00:00
gengjiaan c740c097e0 [SPARK-35063][SQL] Group exception messages in sql/catalyst
### What changes were proposed in this pull request?
This PR group exception messages in `sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst`.

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #32478 from beliefer/SPARK-35063.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-21 06:15:26 +00:00
Takeshi Yamamuro 1a923f5319 [SPARK-35479][SQL] Format PartitionFilters IN strings in scan nodes
### What changes were proposed in this pull request?

This PR proposes to format strings correctly for `PushedFilters`. For example, `explain()` for a query below prints `v in (array('a'))` as `PushedFilters: [In(v, [WrappedArray(a)])]`;

```
scala> sql("create table t (v array<string>) using parquet")
scala> sql("select * from t where v in (array('a'), null)").explain()
== Physical Plan ==
*(1) Filter v#4 IN ([a],null)
+- FileScan parquet default.t[v#4] Batched: false, DataFilters: [v#4 IN ([a],null)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-3.1.1-bin-hadoop2.7/spark-warehouse/t], PartitionFilters: [], PushedFilters: [In(v, [WrappedArray(a),null])], ReadSchema: struct<v:array<string>>
```
This PR makes `explain()` print it as `PushedFilters: [In(v, [[a]])]`;
```
scala> sql("select * from t where v in (array('a'), null)").explain()
== Physical Plan ==
*(1) Filter v#4 IN ([a],null)
+- FileScan parquet default.t[v#4] Batched: false, DataFilters: [v#4 IN ([a],null)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-3.1.1-bin-hadoop2.7/spark-warehouse/t], PartitionFilters: [], PushedFilters: [In(v, [[a],null])], ReadSchema: struct<v:array<string>>
```
NOTE: This PR includes a bugfix caused by #32577 (See the cloud-fan comment: https://github.com/apache/spark/pull/32577/files#r636108150).

### Why are the changes needed?

To improve explain strings.

### Does this PR introduce _any_ user-facing change?

Yes, this PR improves the explain strings for pushed-down filters.

### How was this patch tested?

Added tests in `SQLQueryTestSuite`.

Closes #32615 from maropu/ExplainPartitionFilters.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-21 05:45:45 +00:00
yi.wu e1296eab5f [SPARK-35445][SQL] Reduce the execution time of DeduplicateRelations
### What changes were proposed in this pull request?

This PR reduces the execution time of `DeduplicateRelations` by:

1) use `Set` instead `Seq` to check duplicate relations

2) avoid plan output traverse and attribute rewrites when there are no changes in the children plan

### Why are the changes needed?

Rule `DeduplicateRelations` is slow.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Run `TPCDSQuerySuite` and checked the run time of `DeduplicateRelations`. The time has been reduced by 77.9% after this PR.

Closes #32590 from Ngone51/improve-dedup.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Gengliang Wang <ltnwgl@gmail.com>
2021-05-21 13:25:37 +08:00
Kent Yao 2e9936db93 [SPARK-35456][CORE] Print the invalid value in config validation error message
### What changes were proposed in this pull request?

Print the invalid value in config validation error message for `checkValue` just like `checkValues`

### Why are the changes needed?

Invalid configuration values may come in many ways, this PR can help different kinds of users or developers to identify what the config the error is related to

### Does this PR introduce _any_ user-facing change?

yes, but only error msg
### How was this patch tested?

yes, modified tests

Closes #32600 from yaooqinn/SPARK-35456.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-21 14:22:29 +09:00
itholic 6b912e4179 [SPARK-35364][PYTHON] Renaming the existing Koalas related codes
### What changes were proposed in this pull request?

There are still naming related to Koalas in test and function name. This PR addressed them to fit pandas-on-spark.
- kdf -> psdf
- kser -> psser
- kidx -> psidx
- kmidx -> psmidx
- to_koalas() -> to_pandas_on_spark()

### Why are the changes needed?

This is because the name Koalas is no longer used in PySpark.

### Does this PR introduce _any_ user-facing change?

`to_koalas()` function is renamed to `to_pandas_on_spark()`

### How was this patch tested?

Tested in local manually.
After changing the related naming, I checked them one by one.

Closes #32516 from itholic/SPARK-35364.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
2021-05-20 15:08:30 -07:00
Dongjoon Hyun 8e13b8c3d2 [SPARK-35463][BUILD] Skip checking checksum on a system without shasum
### What changes were proposed in this pull request?

Not every build system has `shasum`. This PR aims to skip checksum checks on a system without `shasum`.

### Why are the changes needed?

**PREPARE**
```
$ docker run -it --rm -v $PWD:/spark openjdk:11-slim /bin/bash
roota0e001a6e50f:/# cd /spark/
roota0e001a6e50f:/spark# apt-get update
roota0e001a6e50f:/spark# apt-get install curl
roota0e001a6e50f:/spark# build/mvn clean
```

**BEFORE (Failure due to `command not found`)**
```
roota0e001a6e50f:/spark# build/mvn clean
exec: curl --silent --show-error -L https://downloads.lightbend.com/scala/2.12.10/scala-2.12.10.tgz
exec: curl --silent --show-error -L https://www.apache.org/dyn/closer.lua/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz?action=download
exec: curl --silent --show-error -L https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz.sha512
Veryfing checksum from /spark/build/apache-maven-3.6.3-bin.tar.gz.sha512
build/mvn: line 81: shasum: command not found
Bad checksum from https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz.sha512
```

**AFTER**
```
roota0e001a6e50f:/spark# build/mvn clean
exec: curl --silent --show-error -L https://downloads.lightbend.com/scala/2.12.10/scala-2.12.10.tgz
Skipping checksum because shasum is not installed.
exec: curl --silent --show-error -L https://www.apache.org/dyn/closer.lua/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz?action=download
exec: curl --silent --show-error -L https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz.sha512
Skipping checksum because shasum is not installed.
Using `mvn` from path: /spark/build/apache-maven-3.6.3/bin/mvn
```

### Does this PR introduce _any_ user-facing change?

Yes, this will recover the build.

### How was this patch tested?

Manually with the above process.

Closes #32613 from dongjoon-hyun/SPARK-35463.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-20 14:36:50 -07:00
Dongjoon Hyun 3757c1803d [SPARK-35462][BUILD][K8S] Upgrade Kubernetes-client to 5.4.0 to support K8s 1.21 models
### What changes were proposed in this pull request?

This PR aims to upgrade `kubernetes-client` from 5.3.1 to 5.4.0 to support K8s 1.21 models officially.

### Why are the changes needed?

`kubernetes-client` 5.4.0 has `Kubernetes Model v1.21.0`
- https://github.com/fabric8io/kubernetes-client/releases/tag/v5.4.0

### Does this PR introduce _any_ user-facing change?

No. This is a dev-only change.

### How was this patch tested?

Pass the CIs including Jenkins K8s IT.
- https://github.com/apache/spark/pull/32612#issuecomment-845456039

I tested K8s IT with the following versions.
- minikube version: v1.20.0
- K8s Client Version: v1.21.0
- Server Version: v1.21.0

```
KubernetesSuite:
- Run SparkPi with no resources
- Run SparkPi with a very long application name.
- Use SparkLauncher.NO_RESOURCE
- Run SparkPi with a master URL without a scheme.
- Run SparkPi with an argument.
- Run SparkPi with custom labels, annotations, and environment variables.
- All pods have the same service account by default
- Run extraJVMOptions check on driver
- Run SparkRemoteFileTest using a remote data file
- Verify logging configuration is picked from the provided SPARK_CONF_DIR/log4j.properties
- Run SparkPi with env and mount secrets.
- Run PySpark on simple pi.py example
- Run PySpark to test a pyfiles example
- Run PySpark with memory customization
- Run in client mode.
- Start pod creation from template
- Launcher client dependencies
- SPARK-33615: Launcher client archives
- SPARK-33748: Launcher python client respecting PYSPARK_PYTHON
- SPARK-33748: Launcher python client respecting spark.pyspark.python and spark.pyspark.driver.python
- Launcher python client dependencies using a zip file
- Test basic decommissioning
- Test basic decommissioning with shuffle cleanup
- Test decommissioning with dynamic allocation & shuffle cleanups
- Test decommissioning timeouts
- Run SparkR on simple dataframe.R example
Run completed in 17 minutes, 18 seconds.
Total number of tests run: 26
Suites: completed 2, aborted 0
Tests: succeeded 26, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

Closes #32612 from dongjoon-hyun/SPARK-35462.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-20 14:34:58 -07:00
Yikun Jiang 38fbc0b4f7 [SPARK-35458][BUILD] Use > /dev/null to replace -q in shasum
## What changes were proposed in this pull request?
Use ` > /dev/null` to replace `-q` in shasum validation.

### Why are the changes needed?
In PR https://github.com/apache/spark/pull/32505 , added the shasum check on maven. The `shasum -a 512 -q -c xxx.sha` is used to validate checksum, the `-q` args is for "don't print OK for each successfully verified file", but `-q` arg is introduce in shasum 6.x version.

So we got the `Unknown option: q`.

```
➜  ~ uname -a
Darwin MacBook.local 19.6.0 Darwin Kernel Version 19.6.0: Mon Apr 12 20:57:45 PDT 2021; root:xnu-6153.141.28.1~1/RELEASE_X86_64 x86_64
➜  ~ shasum -v
5.84
➜  ~ shasum -q
Unknown option: q
Type shasum -h for help
```

it makes ARM CI failed:
[1] https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-arm/

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
`shasum -a 512 -c wrong.sha  > /dev/null` return code 1 without print
`shasum -a 512 -c right.sha  > /dev/null` return code 0 without print
e2e test:
```
rm -f build/apache-maven-3.6.3-bin.tar.gz
rm -r build/apache-maven-3.6.3-bin
mvn -v
```

Closes #32604 from Yikun/patch-5.

Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-05-20 15:59:13 -05:00
Yikun Jiang 3c3533d845 [SPARK-35373][BUILD][FOLLOWUP] Fix "binary operator expected" error on build/mvn
### What changes were proposed in this pull request?
change $(command -v curl) to "$(command -v curl)"

### Why are the changes needed?
We need change $(command -v curl) to "$(command -v curl)" to make sure it work when `curl` or `wget` is uninstall. othewise raised:
`build/mvn: line 56: [: /root/spark/build/apache-maven-3.6.3-bin.tar.gz: binary operator expected`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
```
apt remove curl
rm -f build/apache-maven-3.6.3-bin.tar.gz
rm -r build/apache-maven-3.6.3-bin
mvn -v
```

Closes #32608 from Yikun/patch-6.

Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-05-20 12:25:23 -05:00
Max Gekk 2bd32548f5 [SPARK-35459][SQL][TESTS] Move AvroRowReaderSuite to a separate file
### What changes were proposed in this pull request?
Move `AvroRowReaderSuite` out from `AvroSuite.scala` and place it to `AvroRowReaderSuite.scala`.

### Why are the changes needed?
To improve code maintenance. Usually, independent test suites are placed to separate files.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *AvroRowReaderSuite"
$ build/sbt "test:testOnly *AvroV1Suite"
$ build/sbt "test:testOnly *AvroV2Suite"
```

Closes #32607 from MaxGekk/move-AvroRowReaderSuite.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-20 20:04:10 +09:00
weixiuli 4869e437da [SPARK-35424][SHUFFLE] Remove some useless code in the ExternalBlockHandler
### What changes were proposed in this pull request?

Remove some useless code in the ExternalBlockHandler.

### Why are the changes needed?
There is some useless code in the ExternalBlockHandler, so we may remove it.

### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?

Exist unittests.

Closes #32571 from weixiuli/SPARK-35424.

Authored-by: weixiuli <weixiuli@jd.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-20 19:03:14 +09:00
Bo Zhang e170e63955 [SPARK-35457][BUILD] Bump ANTLR runtime version to 4.8
### What changes were proposed in this pull request?
This PR changes the antlr4-runtime version from 4.8-1 to 4.8.

### Why are the changes needed?
Version 4.8 is the official release version, with a proper release note (see https://github.com/antlr/antlr4/releases) and artifiacts listed in https://www.antlr.org/download/index.html.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Will rely on tests in the PR.

Closes #32603 from bozhang2820/antlr-4.8.

Authored-by: Bo Zhang <bo.zhang@databricks.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-05-20 17:24:40 +09:00
Vinod KC bdd8e1dbb1 [SPARK-28551][SQL] CTAS with LOCATION should not allow to a non-empty directory
### What changes were proposed in this pull request?

CTAS with location clause acts as an insert overwrite. This can cause problems when there are subdirectories within a location directory.
This causes some users to accidentally wipe out directories with very important data. We should not allow CTAS with location to a non-empty directory.

### Why are the changes needed?

Hive already handled this scenario: HIVE-11319

Steps to reproduce:

```scala
sql("""create external table  `demo_CTAS`( `comment` string) PARTITIONED BY (`col1` string, `col2` string) STORED AS parquet location '/tmp/u1/demo_CTAS'""")
sql("""INSERT OVERWRITE TABLE demo_CTAS partition (col1='1',col2='1') VALUES ('abc')""")
sql("select* from demo_CTAS").show
sql("""create table ctas1 location '/tmp/u2/ctas1' as select * from demo_CTAS""")
sql("select* from ctas1").show
sql("""create table ctas2 location '/tmp/u2' as select * from demo_CTAS""")
```

Before the fix: Both create table operations will succeed. But values in table ctas1 will be replaced by ctas2 accidentally.

After the fix: `create table ctas2...` will throw `AnalysisException`:

```
org.apache.spark.sql.AnalysisException: CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory /tmp/u2 . To allow overwriting the existing non-empty directory, set 'spark.sql.legacy.allowNonEmptyLocationInCTAS' to true.
```

### Does this PR introduce _any_ user-facing change?
Yes, if the location directory is not empty, CTAS with location will throw AnalysisException

```
sql("""create table ctas2 location '/tmp/u2' as select * from demo_CTAS""")
```
```
org.apache.spark.sql.AnalysisException: CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory /tmp/u2 . To allow overwriting the existing non-empty directory, set 'spark.sql.legacy.allowNonEmptyLocationInCTAS' to true.
```

`CREATE TABLE AS SELECT` with non-empty `LOCATION` will throw `AnalysisException`. To restore the behavior before Spark 3.2, need to  set `spark.sql.legacy.allowNonEmptyLocationInCTAS` to `true`. , default value is `false`.
Updated SQL migration guide.

### How was this patch tested?
Test case added in SQLQuerySuite.scala

Closes #32411 from vinodkc/br_fixCTAS_nonempty_dir.

Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-20 06:13:18 +00:00
yi.wu 00b63c8dc2 [SPARK-27991][CORE] Defer the fetch request on Netty OOM
### What changes were proposed in this pull request?

This PR proposes a workaround to address the Netty OOM issue (SPARK-24989, SPARK-27991):

Basically, `ShuffleBlockFetcherIterator` would catch the `OutOfDirectMemoryError` from Netty and then set a global flag for the shuffle module. Any pending fetch requests would be deferred if there're in-flight requests until the flag is unset. And the flag will be unset when there's a fetch request succeed.

Note that catching the Netty OOM rather than abort the application is feasible because Netty manage its own memory region (offheap by default) separately. So Netty OOM doesn't mean the memory shortage of Spark.

### Why are the changes needed?

The Netty OOM issue is a very corner case. It usually happens in the large-scale cluster, where a reduce task could fetch shuffle blocks from hundreds of nodes concurrently in a short time. Internally, we found a cluster that has created 260+ clients within 6s before throwing Netty OOM.

Although Spark has configurations, e.g., `spark.reducer.maxReqsInFlight` to tune the number of concurrent requests, it's usually not a easy decision for the user to set a reasonable value regarding the workloads, machine resources, etc. But with this fix, Spark would heal the Netty memory issue itself without any specific configurations.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit tests.

Closes #32287 from Ngone51/SPARK-27991.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-20 04:26:56 +00:00
Ashray Jain de59e01aa4 [SPARK-35443][K8S] Mark K8s ConfigMaps and Secrets created by Spark as immutable
Kubernetes supports marking secrets and config maps as immutable to gain performance.

https://kubernetes.io/docs/concepts/configuration/configmap/#configmap-immutable
https://kubernetes.io/docs/concepts/configuration/secret/#secret-immutable

For K8s clusters that run many thousands of Spark applications, this can yield significant reduction in load on the kube-apiserver.

From the K8s docs:

> For clusters that extensively use Secrets (at least tens of thousands of unique Secret to Pod mounts), preventing changes to their data has the following advantages:
> - protects you from accidental (or unwanted) updates that could cause applications outages
> - improves performance of your cluster by significantly reducing load on kube-apiserver, by closing watches for secrets marked as immutable.

For any secrets and config maps we create in Spark that are immutable, we could mark them as immutable by including the following when building the secret/config map
```
.withImmutable(true)
```
This feature has been supported in K8s as beta since K8s 1.19 and as GA since K8s 1.21

### What changes were proposed in this pull request?
All K8s secrets and config maps created by Spark are marked "immutable".

### Why are the changes needed?
See description above.

### Does this PR introduce _any_ user-facing change?
Don't think so

### How was this patch tested?
Augmented existing unit tests.

Closes #32588 from ashrayjain/patch-1.

Authored-by: Ashray Jain <ashrayjain@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-19 21:25:33 -07:00
Xinrong Meng a970f8505d [SPARK-35338][PYTHON] Separate arithmetic operations into data type based structures
### What changes were proposed in this pull request?

The PR is proposed for **pandas APIs on Spark**, in order to separate arithmetic operations shown as below into data-type-based structures.
`__add__, __sub__, __mul__, __truediv__, __floordiv__, __pow__, __mod__,
__radd__, __rsub__, __rmul__, __rtruediv__, __rfloordiv__, __rpow__,__rmod__`

DataTypeOps and subclasses are introduced.

The existing behaviors of each arithmetic operation should be preserved.

### Why are the changes needed?

Currently, the same arithmetic operation of all data types is defined in one function, so it’s difficult to extend the behavior change based on the data types.

Introducing DataTypeOps would be the foundation for [pandas APIs on Spark: Separate basic operations into data type based structures.](https://docs.google.com/document/d/12MS6xK0hETYmrcl5b9pX5lgV4FmGVfpmcSKq--_oQlc/edit?usp=sharing).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Tests are introduced under pyspark.pandas.tests.data_type_ops. One test file per DataTypeOps class.

Closes #32596 from xinrong-databricks/datatypeop_arith_fix.

Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
2021-05-19 19:47:00 -07:00
Hyukjin Kwon 7eaabf4df5 [SPARK-35408][PYTHON][FOLLOW-UP] Avoid unnecessary f-string format
### What changes were proposed in this pull request?

This PR avoids using f-string format that's a new feature in Python 3.6. Although it's legitimate to use this syntax because Apache Spark supports Python 3.6+, this breaks unofficial support of Python 3.5.

This specific f-string format looks something unnecessary, and doesn't look worth enough to remove such unofficial support because of one string format in an error message.

**NOTE** that this PR doesn't mean that we're maintaining Python 3.5 since we dropped. It just looks like too much to remove that unofficial support only because of one string format and error message.

### Why are the changes needed?

To keep unofficial Python 3.5 support

### Does this PR introduce _any_ user-facing change?

Officially nope.

### How was this patch tested?

Ran the linters.

Closes #32598 from HyukjinKwon/SPARK-35408=followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-20 10:47:31 +09:00
Takuya UESHIN c06480519e [SPARK-35450][INFRA] Follow checkout-merge way to use the latest commit for linter, or other workflows
### What changes were proposed in this pull request?

Follows checkout-merge way to use the latest commit for linter, or other workflows.

### Why are the changes needed?

For linter or other workflows besides build-and-tests, we should follow checkout-merge way to use the latest commit; otherwise, those could work on the old settings.

### Does this PR introduce _any_ user-facing change?

No, this is a dev-only change.

### How was this patch tested?

Existing tests.

Closes #32597 from ueshin/issues/SPARK-35450/infra.

Lead-authored-by: Takuya UESHIN <ueshin@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-20 10:07:28 +09:00
Takuya UESHIN d44e6c7f10 Revert "[SPARK-35338][PYTHON] Separate arithmetic operations into data type based structures"
This reverts commit d1b24d8aba.
2021-05-19 16:49:47 -07:00
Cheng Su 586caae3cc [SPARK-35438][SQL][DOCS] Minor documentation fix for window physical operator
### What changes were proposed in this pull request?

As title. Fixed two places where the documentation for window operator has some error.

### Why are the changes needed?

Help people read code for window operator more easily in the future.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #32585 from c21/minor-doc.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-05-20 08:47:19 +09:00
Xinrong Meng d1b24d8aba [SPARK-35338][PYTHON] Separate arithmetic operations into data type based structures
### What changes were proposed in this pull request?

The PR is proposed for **pandas APIs on Spark**, in order to separate arithmetic operations shown as below into data-type-based structures.
`__add__, __sub__, __mul__, __truediv__, __floordiv__, __pow__, __mod__,
__radd__, __rsub__, __rmul__, __rtruediv__, __rfloordiv__, __rpow__,__rmod__`

DataTypeOps and subclasses are introduced.

The existing behaviors of each arithmetic operation should be preserved.

### Why are the changes needed?

Currently, the same arithmetic operation of all data types is defined in one function, so it’s difficult to extend the behavior change based on the data types.

Introducing DataTypeOps would be the foundation for [pandas APIs on Spark: Separate basic operations into data type based structures.](https://docs.google.com/document/d/12MS6xK0hETYmrcl5b9pX5lgV4FmGVfpmcSKq--_oQlc/edit?usp=sharing).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Tests are introduced under pyspark.pandas.tests.data_type_ops. One test file per DataTypeOps class.

Closes #32469 from xinrong-databricks/datatypeop_arith.

Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
2021-05-19 15:05:32 -07:00
Andy Grove 52e3cf9ff5 [SPARK-35093][SQL] AQE now uses newQueryStage plan as key for looking up cached exchanges for re-use
### What changes were proposed in this pull request?
AQE has an optimization where it attempts to reuse compatible exchanges but it does not take into account whether the exchanges are columnar or not, resulting in incorrect reuse under some circumstances.

This PR simply changes the key used to lookup cached stages. It now uses the canonicalized form of the new query stage (potentially created by a plugin) rather than using the canonicalized form of the original exchange.

### Why are the changes needed?
When using the [RAPIDS Accelerator for Apache Spark](https://github.com/NVIDIA/spark-rapids) we sometimes see a new query stage correctly create a row-based exchange and then Spark replaces it with a cached columnar exchange, which is not compatible, and this causes queries to fail.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
The patch has been tested with the query that highlighted this issue. I looked at writing unit tests for this but it would involve implementing a mock columnar exchange in the tests so would be quite a bit of work. If anyone has ideas on other ways to test this I am happy to hear them.

Closes #32195 from andygrove/SPARK-35093.

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
2021-05-19 07:45:26 -05:00
shahid 12142130cd [SPARK-35362][SQL] Update null count in the column stats for UNION operator stats estimation
### What changes were proposed in this pull request?
Updating column stats for Union operator stats estimation
### Why are the changes needed?
This is a followup PR to update the null count also in the Union stats operator estimation. https://github.com/apache/spark/pull/30334

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Updated UTs, manual testing

Closes #32494 from shahidki31/shahid/updateNullCountForUnion.

Lead-authored-by: shahid <shahidki31@gmail.com>
Co-authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-05-19 21:23:19 +09:00
Kousuke Saruta 9283bebbbd [SPARK-35418][SQL] Add sentences function to functions.{scala,py}
### What changes were proposed in this pull request?

This PR adds `sentences`, a string function, which is present as of `2.0.0` but missing in `functions.{scala,py}`.

### Why are the changes needed?

This function can be only used from SQL for now.
It's good if we can use this function from Scala/Python code as well as SQL.

### Does this PR introduce _any_ user-facing change?

Yes. Users can use this function from Scala and Python.

### How was this patch tested?

New test.

Closes #32566 from sarutak/sentences-function.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2021-05-19 20:07:28 +09:00
shahid 46f7d780d3 [SPARK-35368][SQL] Update histogram statistics for RANGE operator for stats estimation
### What changes were proposed in this pull request?
Update histogram statistics for RANGE operator stats estimation
### Why are the changes needed?
If histogram optimization is enabled, this statistics can be used in various cost based optimizations.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UTs. Manual test.

Closes #32498 from shahidki31/shahid/histogram.

Lead-authored-by: shahid <shahidki31@gmail.com>
Co-authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-05-19 16:49:32 +09:00
Yuzhou Sun a72d05c7e6 [SPARK-35106][CORE][SQL] Avoid failing rename caused by destination directory not exist
### What changes were proposed in this pull request?

1. In HadoopMapReduceCommitProtocol, create parent directory before renaming custom partition path staging files
2. In InMemoryCatalog and HiveExternalCatalog, create new partition directory before renaming old partition path
3. Check return value of FileSystem#rename, if false, throw exception to avoid silent data loss cause by rename failure
4. Change DebugFilesystem#rename behavior to make it match HDFS's behavior (return false without rename when dst parent directory not exist)

### Why are the changes needed?

Depends on FileSystem#rename implementation, when destination directory does not exist, file system may
1. return false without renaming file nor throwing exception (e.g. HDFS), or
2. create destination directory, rename files, and return true (e.g. LocalFileSystem)

In the first case above, renames in HadoopMapReduceCommitProtocol for custom partition path will fail silently if the destination partition path does not exist. Failed renames can happen when
1. dynamicPartitionOverwrite == true, the custom partition path directories are deleted by the job before the rename; or
2. the custom partition path directories do not exist before the job; or
3. something else is wrong when file system handle `rename`

The renames in MemoryCatalog and HiveExternalCatalog for partition renaming also have similar issue.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Modified DebugFilesystem#rename, and added new unit tests.

Without the fix in src code, five InsertSuite tests and one AlterTableRenamePartitionSuite test failed:
InsertSuite.SPARK-20236: dynamic partition overwrite with custom partition path (existing test with modified FS)
```
== Results ==
!== Correct Answer - 1 ==   == Spark Answer - 0 ==
struct<>                   struct<>
![2,1,1]
```

InsertSuite.SPARK-35106: insert overwrite with custom partition path
```
== Results ==
!== Correct Answer - 1 ==   == Spark Answer - 0 ==
struct<>                   struct<>
![2,1,1]
```

InsertSuite.SPARK-35106: dynamic partition overwrite with custom partition path
```
== Results ==
!== Correct Answer - 2 ==   == Spark Answer - 1 ==
!struct<>                   struct<i:int,part1:int,part2:int>
 [1,1,1]                    [1,1,1]
![1,1,2]
```

InsertSuite.SPARK-35106: Throw exception when rename custom partition paths returns false
```
Expected exception org.apache.spark.SparkException to be thrown, but no exception was thrown
```

InsertSuite.SPARK-35106: Throw exception when rename dynamic partition paths returns false
```
Expected exception org.apache.spark.SparkException to be thrown, but no exception was thrown
```

AlterTableRenamePartitionSuite.ALTER TABLE .. RENAME PARTITION V1: multi part partition (existing test with modified FS)
```
== Results ==
!== Correct Answer - 1 ==   == Spark Answer - 0 ==
 struct<>                   struct<>
![3,123,3]
```

Closes #32530 from YuzhouSun/SPARK-35106.

Authored-by: Yuzhou Sun <yuzhosun@amazon.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-19 15:46:27 +08:00
Kousuke Saruta 0b3758e8cd [SPARK-35421][SS] Remove redundant ProjectExec from streaming queries with V2Relation
### What changes were proposed in this pull request?

This PR fixes an issue that streaming queries with V2Relation can have redundant `ProjectExec` in its physical plan.
You can easily reproduce this issue with the following code.
```
import org.apache.spark.sql.streaming.Trigger

val query = spark.
  readStream.
  format("rate").
  option("rowsPerSecond", 1000).
  option("rampUpTime", "10s").
  load().
  selectExpr("timestamp", "100",  "value").
  writeStream.
  format("console").
  trigger(Trigger.ProcessingTime("5 seconds")).
  // trigger(Trigger.Continuous("5 seconds")). // You can reproduce with continuous processing too.
  outputMode("append").
  start()
```
The plan tree is here.
![ss-before](https://user-images.githubusercontent.com/4736016/118454996-ec439800-b733-11eb-8cd8-ed8af73a91b8.png)

### Why are the changes needed?

For better performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I run the same code above and get the following plan tree.
![ss-after](https://user-images.githubusercontent.com/4736016/118455755-1bf2a000-b734-11eb-999e-4b8c19ad34d7.png)

Closes #32570 from sarutak/fix-redundant-projectexec.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-19 06:19:01 +00:00
yangjie01 b1493d82dd [SPARK-35398][SQL] Simplify the way to get classes from ClassBodyEvaluator in CodeGenerator.updateAndGetCompilationStats method
### What changes were proposed in this pull request?
SPARK-35253 upgraded janino from 3.0.16 to 3.1.4, `ClassBodyEvaluator` provides the `getBytecodes` method to get
the mapping from `ClassFile#getThisClassName` to `ClassFile#toByteArray` directly in this version and we don't need to get this variable by reflection api anymore.

So the main purpose of this pr is simplify the way to get `bytecodes` from `ClassBodyEvaluator` in `CodeGenerator#updateAndGetCompilationStats` method.

### Why are the changes needed?
Code simplification.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass the Jenkins or GitHub Action

- Manual test:

1. Define a code fragment to be tested, for example:
```
    val codeBody = s"""
        public java.lang.Object generate(Object[] references) {
          return new TestMetricCode(references);
        }

        class TestMetricCode {

          public TestMetricCode(Object[] references) {
          }

          public long sumOfSquares(long left, long right) {
            return left * left + right * right;
          }
        }
      """
```
2. Create a `ClassBodyEvaluator` and `cook` the `codeBody` as above, the process of creating `ClassBodyEvaluator` can extract from `CodeGenerator#doCompile` method.

3. Get `bytecodes` using `ClassBodyEvaluator#getBytecodes` api(after this pr) and reflection api(before this pr) respectively, then assert that they are the same. If the `bytecodes` not changed, we can be sure that metrics state will not change. The test code example as follows:
```
    import scala.collection.JavaConverters._
    val bytecodesFromApi = evaluator.getBytecodes.asScala
    val bytecodesFromReflectionApi = {
      val scField = classOf[ClassBodyEvaluator].getDeclaredField("sc")
      scField.setAccessible(true)
      val compiler = scField.get(evaluator).asInstanceOf[SimpleCompiler]
      val loader = compiler.getClassLoader.asInstanceOf[ByteArrayClassLoader]
      val classesField = loader.getClass.getDeclaredField("classes")
      classesField.setAccessible(true)
      classesField.get(loader).asInstanceOf[java.util.Map[String, Array[Byte]]].asScala
    }

    assert(bytecodesFromApi == bytecodesFromReflectionApi)
```

Closes #32536 from LuciferYang/SPARK-35253-FOLLOWUP.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-05-19 13:03:35 +09:00
Erik Krogen 186477c60e [SPARK-35263][TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code
### What changes were proposed in this pull request?
Introduce new shared methods to `ShuffleBlockFetcherIteratorSuite` to replace copy-pasted code. Use modern, Scala-like Mockito `Answer` syntax.

### Why are the changes needed?
`ShuffleFetcherBlockIteratorSuite` has tons of duplicate code, like 0494dc90af/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala (L172-L185) . It's challenging to tell what the interesting parts are vs. what is just being set to some default/unused value.

Similarly but not as bad, there are many calls like the following
```
verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any(), any())
when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any())).thenAnswer ...
```

These changes result in about 10% reduction in both lines and characters in the file:
```bash
# Before
> wc core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
    1063    3950   43201 core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala

# After
> wc core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
     928    3609   39053 core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
```

It also helps readability, e.g.:
```
    val iterator = createShuffleBlockIteratorWithDefaults(
      transfer,
      blocksByAddress,
      maxBytesInFlight = 1000L
    )
```
Now I can clearly tell that `maxBytesInFlight` is the main parameter we're interested in here.

### Does this PR introduce _any_ user-facing change?
No, test only. There aren't even any behavior changes, just refactoring.

### How was this patch tested?
Unit tests pass.

Closes #32389 from xkrogen/xkrogen-spark-35263-refactor-shuffleblockfetcheriteratorsuite.

Authored-by: Erik Krogen <xkrogen@apache.org>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-05-18 22:37:47 -05:00
Kousuke Saruta 8c70c17545 [SPARK-35434][BUILD] Upgrade scalatestplus artifacts to 3.2.9.0
### What changes were proposed in this pull request?

This PR upgrades the scalatestplus artifacts and scalacheck.

### Why are the changes needed?

scalatestplus artifacts Spark uses are two years old and these artifacts are currently renamed.
So, let's follow up.
Also, the latest releases seem to support Scala 3.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

GA passed on my repository.

Closes #32581 from sarutak/upgrade-scalatestplus.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-18 09:33:11 -07:00
Tengfei Huang 9804f07c17 [SPARK-35411][SQL] Add essential information while serializing TreeNode to json
### What changes were proposed in this pull request?
Write out Seq of product objects which contain TreeNode, to avoid the cases as described in https://issues.apache.org/jira/browse/SPARK-35411 that essential information will be ignored and just written out as null values. These information are necessary to understand the query plans.

### Why are the changes needed?
Information like cteRelations in With node, and branches in CaseWhen expression are necessary to understand the query plans, they should be written out to the result json string.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT case added.

Closes #32557 from ivoson/plan-json-fix.

Authored-by: Tengfei Huang <tengfei.h@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-18 23:20:12 +08:00
Takeshi Yamamuro 746d80d87a [SPARK-35422][SQL] Fix plan-printing issues to pass the TPCDS plan stability tests in Scala v2.13
### What changes were proposed in this pull request?

To pass the TPCDS-related plan stability tests in scala-2.13, this PR proposes to fix two things below;
 - (1) Sorts elements in the predicate `InSet` and the source filter `In` for printing their nodes.
 - (2) Formats nested collection elements (`Seq`, `Array`, and `Set`) recursively in `TreeNode.argString`.

As for (1), it seems v2.12/v2.13 prints `Set` elements with a different order, so we need to sort them explicitly. As for (2), the `Seq` implementation is different between v2.12/v2.13, so we need to format nested  `Seq` elements correctly to hide the name of its implementation (See an example below);
```
 (74) Expand [codegen id : 20]
 Input [5]: [sales#41, RETURNS#42, profit#43, channel#44, id#45]
-Arguments: [ArrayBuffer(sales#41, returns#42, ...    <-- scala-2.12
+Arguments: [Vector(sales#41, returns#42, ...         <-- scala-2.13

+Arguments: [[(sales#41, returns#42, ...              <--  the proposed fix to hide the name of its implementation
```

### Why are the changes needed?

To pass the tests in Scala v2.13.

### Does this PR introduce _any_ user-facing change?

Yes, this fix changes query explain strings.

### How was this patch tested?

Manually checked.

Closes #32577 from maropu/FixTPCDSTestIssueInScala213.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-18 08:50:00 +00:00
Chao Sun 44d762abc6 [SPARK-35389][SQL] V2 ScalarFunction should support magic method with null arguments
### What changes were proposed in this pull request?

When creating `Invoke` and `StaticInvoke` for `ScalarFunction`'s magic method, set `propagateNull` to false.

### Why are the changes needed?

When `propgagateNull` is true (which is the default value), `Invoke` and `StaticInvoke` will return null if any of the argument is null. For scalar function this is incorrect, as we should leave the logic to function implementation instead.

### Does this PR introduce _any_ user-facing change?

Yes. Now null arguments shall be properly handled with magic method.

### How was this patch tested?

Added new tests.

Closes #32553 from sunchao/SPARK-35389.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-18 08:45:55 +00:00
Cheng Su cce0048c78 [SPARK-35351][SQL] Add code-gen for left anti sort merge join
### What changes were proposed in this pull request?

As title. This PR is to add code-gen support for LEFT ANTI sort merge join. The main change is to extract `loadStreamed` in `SortMergeJoinExec.doProduce()`. That is to set all columns values for streamed row, when the streamed row has no output row.

Example query:

```
val df1 = spark.range(10).select($"id".as("k1"))
val df2 = spark.range(4).select($"id".as("k2"))
df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_anti")
```

Example generated code:

```
== Subtree 5 / 5 (maxMethodCodeSize:296; maxConstantPoolSize:156(0.24% used); numInnerClasses:0) ==
*(5) Project [id#0L AS k1#2L]
+- *(5) SortMergeJoin [id#0L], [k2#6L], LeftAnti
   :- *(2) Sort [id#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#0L, 5), ENSURE_REQUIREMENTS, [id=#27]
   :     +- *(1) Range (0, 10, step=1, splits=2)
   +- *(4) Sort [k2#6L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(k2#6L, 5), ENSURE_REQUIREMENTS, [id=#33]
         +- *(3) Project [id#4L AS k2#6L]
            +- *(3) Range (0, 4, step=1, splits=2)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage5(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=5
/* 006 */ final class GeneratedIteratorForCodegenStage5 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private scala.collection.Iterator smj_streamedInput_0;
/* 010 */   private scala.collection.Iterator smj_bufferedInput_0;
/* 011 */   private InternalRow smj_streamedRow_0;
/* 012 */   private InternalRow smj_bufferedRow_0;
/* 013 */   private long smj_value_2;
/* 014 */   private org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches_0;
/* 015 */   private long smj_value_3;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] smj_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 017 */
/* 018 */   public GeneratedIteratorForCodegenStage5(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */     smj_streamedInput_0 = inputs[0];
/* 026 */     smj_bufferedInput_0 = inputs[1];
/* 027 */
/* 028 */     smj_matches_0 = new org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(1, 2147483647);
/* 029 */     smj_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 030 */     smj_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 031 */
/* 032 */   }
/* 033 */
/* 034 */   private boolean findNextJoinRows(
/* 035 */     scala.collection.Iterator streamedIter,
/* 036 */     scala.collection.Iterator bufferedIter) {
/* 037 */     smj_streamedRow_0 = null;
/* 038 */     int comp = 0;
/* 039 */     while (smj_streamedRow_0 == null) {
/* 040 */       if (!streamedIter.hasNext()) return false;
/* 041 */       smj_streamedRow_0 = (InternalRow) streamedIter.next();
/* 042 */       long smj_value_0 = smj_streamedRow_0.getLong(0);
/* 043 */       if (false) {
/* 044 */         if (!smj_matches_0.isEmpty()) {
/* 045 */           smj_matches_0.clear();
/* 046 */         }
/* 047 */         return false;
/* 048 */
/* 049 */       }
/* 050 */       if (!smj_matches_0.isEmpty()) {
/* 051 */         comp = 0;
/* 052 */         if (comp == 0) {
/* 053 */           comp = (smj_value_0 > smj_value_3 ? 1 : smj_value_0 < smj_value_3 ? -1 : 0);
/* 054 */         }
/* 055 */
/* 056 */         if (comp == 0) {
/* 057 */           return true;
/* 058 */         }
/* 059 */         smj_matches_0.clear();
/* 060 */       }
/* 061 */
/* 062 */       do {
/* 063 */         if (smj_bufferedRow_0 == null) {
/* 064 */           if (!bufferedIter.hasNext()) {
/* 065 */             smj_value_3 = smj_value_0;
/* 066 */             return !smj_matches_0.isEmpty();
/* 067 */           }
/* 068 */           smj_bufferedRow_0 = (InternalRow) bufferedIter.next();
/* 069 */           long smj_value_1 = smj_bufferedRow_0.getLong(0);
/* 070 */           if (false) {
/* 071 */             smj_bufferedRow_0 = null;
/* 072 */             continue;
/* 073 */           }
/* 074 */           smj_value_2 = smj_value_1;
/* 075 */         }
/* 076 */
/* 077 */         comp = 0;
/* 078 */         if (comp == 0) {
/* 079 */           comp = (smj_value_0 > smj_value_2 ? 1 : smj_value_0 < smj_value_2 ? -1 : 0);
/* 080 */         }
/* 081 */
/* 082 */         if (comp > 0) {
/* 083 */           smj_bufferedRow_0 = null;
/* 084 */         } else if (comp < 0) {
/* 085 */           if (!smj_matches_0.isEmpty()) {
/* 086 */             smj_value_3 = smj_value_0;
/* 087 */             return true;
/* 088 */           } else {
/* 089 */             return false;
/* 090 */           }
/* 091 */         } else {
/* 092 */           if (smj_matches_0.isEmpty()) {
/* 093 */             smj_matches_0.add((UnsafeRow) smj_bufferedRow_0);
/* 094 */           }
/* 095 */
/* 096 */           smj_bufferedRow_0 = null;
/* 097 */         }
/* 098 */       } while (smj_streamedRow_0 != null);
/* 099 */     }
/* 100 */     return false; // unreachable
/* 101 */   }
/* 102 */
/* 103 */   protected void processNext() throws java.io.IOException {
/* 104 */     while (smj_streamedInput_0.hasNext()) {
/* 105 */       findNextJoinRows(smj_streamedInput_0, smj_bufferedInput_0);
/* 106 */
/* 107 */       long smj_value_4 = -1L;
/* 108 */       smj_value_4 = smj_streamedRow_0.getLong(0);
/* 109 */       scala.collection.Iterator<UnsafeRow> smj_iterator_0 = smj_matches_0.generateIterator();
/* 110 */
/* 111 */       boolean wholestagecodegen_hasOutputRow_0 = false;
/* 112 */
/* 113 */       while (!wholestagecodegen_hasOutputRow_0 && smj_iterator_0.hasNext()) {
/* 114 */         InternalRow smj_bufferedRow_1 = (InternalRow) smj_iterator_0.next();
/* 115 */
/* 116 */         wholestagecodegen_hasOutputRow_0 = true;
/* 117 */       }
/* 118 */
/* 119 */       if (!wholestagecodegen_hasOutputRow_0) {
/* 120 */         // load all values of streamed row, because the values not in join condition are not
/* 121 */         // loaded yet.
/* 122 */
/* 123 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 124 */
/* 125 */         // common sub-expressions
/* 126 */
/* 127 */         smj_mutableStateArray_0[1].reset();
/* 128 */
/* 129 */         smj_mutableStateArray_0[1].write(0, smj_value_4);
/* 130 */         append((smj_mutableStateArray_0[1].getRow()).copy());
/* 131 */
/* 132 */       }
/* 133 */       if (shouldStop()) return;
/* 134 */     }
/* 135 */     ((org.apache.spark.sql.execution.joins.SortMergeJoinExec) references[1] /* plan */).cleanupResources();
/* 136 */   }
/* 137 */
/* 138 */ }
```

### Why are the changes needed?

Improve the query CPU performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit test in `WholeStageCodegenSuite.scala`, and existed unit test in `ExistenceJoinSuite.scala`.

Closes #32547 from c21/smj-left-anti.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-05-18 16:56:45 +09:00
Kousuke Saruta 7b942d523c [SPARK-35425][BUILD] Pin jinja2 in spark-rm/Dockerfile and add as a required dependency in the release README.md
### What changes were proposed in this pull request?

The following two things are done in this PR.

* Add note about Jinja2 as a required dependency for document build.
* Add Jinja2 dependency for the document build to `spark-rm/Dockerfile`

### Why are the changes needed?

SPARK-35375(#32509) confined the version of Jinja to <3.0.0.
So it's good to note about it in `docs/README.md` and add the dependency to `spark-rm/Dockerfile`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I confimed that `make html` succeed under `python/docs` with the following command.
```
sudo pip install 'sphinx<3.1.0' mkdocs numpy pydata_sphinx_theme ipython nbsphinx numpydoc 'jinja2<3.0.0'
```

Closes #32573 from sarutak/required-module-for-python-doc.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2021-05-18 16:48:23 +09:00
Takeshi Yamamuro 3b859a16c0 [SPARK-35431][SQL][TESTS] Sort elements generated by collect_set in SQLQueryTestSuite
### What changes were proposed in this pull request?

To pass `subquery/scalar-subquery/scalar-subquery-select.sql` (`SQLQueryTestSuite`) in Scala v2.13,  this PR proposes to change the aggregate expr of a test query in the file from `collect_set(...)` to `sort_array(collect_set(...))` because `collect_set` depends on the `mutable.HashSet` implementation and elements in the set are printed in a different order in Scala v2.12/v2.13.

### Why are the changes needed?

To pass the test in Scala v2.13.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

Manually checked.

Closes #32578 from maropu/FixSQLTestIssueInScala213.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-17 22:51:32 -07:00
Hyukjin Kwon 747fe7282c [SPARK-35419][PYTHON] Enable spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled by default
### What changes were proposed in this pull request?

https://github.com/apache/spark/pull/30309 added a configuration (disabled by default) that simplifies the error messages from Python UDFS, which removed internal stacktrace from Python workers:

```python
from pyspark.sql.functions import udf; spark.range(10).select(udf(lambda x: x/0)("id")).collect()
```

**Before**

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../python/pyspark/sql/dataframe.py", line 427, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/.../python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/.../python/pyspark/sql/utils.py", line 127, in deco
    raise_from(converted)
  File "<string>", line 3, in raise_from
pyspark.sql.utils.PythonException:
  An exception was thrown from Python worker in the executor:
Traceback (most recent call last):
  File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/.../python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/.../python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/.../python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
    return lambda *a: f(*a)
  File "/.../python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 1, in <lambda>
ZeroDivisionError: division by zero
```

**After**

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../python/pyspark/sql/dataframe.py", line 427, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/.../python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/.../python/pyspark/sql/utils.py", line 127, in deco
    raise_from(converted)
  File "<string>", line 3, in raise_from
pyspark.sql.utils.PythonException:
  An exception was thrown from Python worker in the executor:
Traceback (most recent call last):
  File "<stdin>", line 1, in <lambda>
ZeroDivisionError: division by zero
```

Note that the traceback (`return f(*args, **kwargs)`) is almost always same - I would say more than 99%. For 1% case, we can guide developers to enable this configuration for further debugging.

In Databricks, it has been enabled for around 6 months, and I have had zero negative feedback on it.

### Why are the changes needed?

To show simplified exception messages to end users.

### Does this PR introduce _any_ user-facing change?

Yes, it will hide the internal Python worker traceback.

### How was this patch tested?

Existing test cases should cover.

Closes #32569 from HyukjinKwon/SPARK-35419.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-18 12:27:09 +09:00
Chao Sun a60c36458d [SPARK-34981][SQL][TESTS][FOLLOWUP] Fix test failure under Scala 2.13
### What changes were proposed in this pull request?

Fix test failure under Scala 2.13 by making test `ScalaFunction` `StrLenMagic` public.

### Why are the changes needed?

A few tests are failing when using Scala 2.13 with error message like the following:
```
[info]   Cause: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 35, Column 121: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 35, Column 121: No a
pplicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public int org.apache.spark.sql.connector.DataSourceV2FunctionSuite$StrLenMagic$.invoke(org.apache.spark.
unsafe.types.UTF8String)"
[info]   at org.apache.spark.sql.errors.QueryExecutionErrors$.compilerError(QueryExecutionErrors.scala:387)
[info]   at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1415)
[info]   at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1501)
```

This seems to be caused by the fact that the `StrLenMagic` is using `private` scope. After removing the `private` keyword the tests are now passing.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

```
$ dev/change-scala-version.sh 2.13
$ build/sbt "sql/testOnly *.DataSourceV2FunctionSuite" -Pscala-2.13
```

Closes #32575 from sunchao/SPARK-34981-follow-up.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-17 16:17:01 -07:00
Takuya UESHIN 2a335f2d7d [SPARK-34941][PYTHON] Fix mypy errors and enable mypy check for pandas-on-Spark
### What changes were proposed in this pull request?

Fixes `mypy` errors and enables `mypy` check for pandas-on-Spark.

### Why are the changes needed?

The `mypy` check for pandas-on-Spark was disabled when the initial porting.
It should be enabled again; otherwise we will miss type checking errors.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The enabled `mypy` check and existing unit tests should pass.

Closes #32540 from ueshin/issues/SPARK-34941/pandas_mypy.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
2021-05-17 10:46:59 -07:00
fhygh 3a3f8ca6f4 [SPARK-35359][SQL] Insert data with char/varchar datatype will fail when data length exceed length limitation
### What changes were proposed in this pull request?
This PR is used to fix this bug:

```
set spark.sql.legacy.charVarcharAsString=true;
create table chartb01(a char(3));
insert into chartb01 select 'aaaaa';
```

here we expect the data of table chartb01 is 'aaa', but it runs failed.

### Why are the changes needed?
Improve backward compatibility

```
spark-sql>
         > create table tchar01(col char(2)) using parquet;
Time taken: 0.767 seconds
spark-sql>
         > insert into tchar01 select 'aaa';
ERROR | Executor task launch worker for task 0.0 in stage 0.0 (TID 0) | Aborting task | org.apache.spark.util.Utils.logError(Logging.scala:94)
java.lang.RuntimeException: Exceeds char/varchar type length limitation: 2
        at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.trimTrailingSpaces(CharVarcharCodegenUtils.java:31)
        at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.charTypeWriteSideCheck(CharVarcharCodegenUtils.java:44)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:279)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1500)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:288)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:212)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1466)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
```

### Does this PR introduce _any_ user-facing change?
No (the legacy config is false by default).

### How was this patch tested?
Added unit tests.

Closes #32501 from fhygh/master.

Authored-by: fhygh <283452027@qq.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-18 00:13:40 +08:00
Wenchen Fan 3b63f32601 [SPARK-35400][SQL] Simplify getOuterReferences and improve error message for correlated subquery
### What changes were proposed in this pull request?

Spark doesn't support aggregate functions with mixed outer and local references. This PR applies this check earlier to fail with a clear error message instead of some weird ones, and simplifies the related code in `SubExprUtils.getOuterReferences`. This PR also refines the error message a bit.

### Why are the changes needed?

better error message

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

updated tests

Closes #32503 from cloud-fan/try.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-05-17 14:13:44 +00:00
Chris Thomas ceb8122c40 [SPARK-35399][DOCUMENTATION] State is still needed in the event of executor failure
### What changes were proposed in this pull request?

Fix incorrect statement that state is no longer needed in the event of executor failure and document that it is needed in the case of a flaky app causing occasional executor failure.

SO [discussion](https://stackoverflow.com/questions/67466878/can-spark-with-external-shuffle-service-use-saved-shuffle-files-in-the-event-of/67507439#67507439).

### Why are the changes needed?

To fix the documentation and guide users as to additional use case for the Shuffle Service.

### Does this PR introduce _any_ user-facing change?

Documentation only.

### How was this patch tested?

N/A.

Closes #32538 from chrisheaththomas/shuffle-service-and-executor-failure.

Authored-by: Chris Thomas <chrisheaththomas@hotmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-05-17 08:58:46 -05:00
Kousuke Saruta b4348b7e56 [SPARK-35420][BUILD] Replace the usage of toStringHelper with ToStringBuilder
### What changes were proposed in this pull request?

This PR replaces `toStringHelper`, an API which breaks in Guava 27.

### Why are the changes needed?

SPARK-30272 (#26911) removed usages which breaks in Guava 27 but `toStringHelper` is instroduced again.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Build successfully finished with the following command.
```
build/sbt -Dguava.version=27.0-jre -Phive -Phive-thriftserver -Pyarn -Pmesos -Pkubernetes -Phadoop-cloud -Pdocker-integration-tests -Pkubernetes-integration-tests -Pkinesis-asl -Pspark-ganglia-lgpl package
```

Closes #32567 from sarutak/remove-old-guava-usage.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2021-05-17 21:46:35 +09:00
Jungtaek Lim 7c13636be3 [SPARK-34888][SS] Introduce UpdatingSessionIterator adjusting session window on elements
Introduction: this PR is a part of SPARK-10816 (`EventTime based sessionization (session window)`). Please refer #31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.)

### What changes were proposed in this pull request?

This PR introduces UpdatingSessionsIterator, which analyzes neighbor elements and adjust session information on elements.

UpdatingSessionsIterator calculates and updates the session window for each element in the given iterator, which makes elements in the same session window having same session spec. Downstream can apply aggregation to finally merge these elements bound to the same session window.

UpdatingSessionsIterator works on the precondition that given iterator is sorted by "group keys + start time of session window", and the iterator still retains the characteristic of the sort.

UpdatingSessionsIterator copies the elements to safely update on each element, as well as buffers elements which are bound to the same session window. Due to such overheads, MergingSessionsIterator which will be introduced via SPARK-34889 should be used whenever possible.

This PR also introduces UpdatingSessionsExec which is the physical node on leveraging UpdatingSessionsIterator to sort the input rows and updates session information on input rows.

### Why are the changes needed?

This part is a one of required on implementing SPARK-10816.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test suite added.

Closes #31986 from HeartSaVioR/SPARK-34888-SPARK-10816-PR-31570-part-1.

Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-05-17 21:05:49 +09:00
Gera Shegalov 9eb45ecb4f [SPARK-35408][PYTHON] Improve parameter validation in DataFrame.show
### What changes were proposed in this pull request?
Provide clearer error message tied to the user's Python code if incorrect parameters are passed to `DataFrame.show` rather than the message about a missing JVM method the user is not calling directly.

```
py4j.Py4JException: Method showString([class java.lang.Boolean, class java.lang.Integer, class java.lang.Boolean]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748
```

### Why are the changes needed?
For faster debugging through actionable error message.

### Does this PR introduce _any_ user-facing change?
No change for the correct parameters but different error messages for the parameters triggering an exception.

### How was this patch tested?
- unit test
- manually in PySpark REPL

Closes #32555 from gerashegalov/df_show_validation.

Authored-by: Gera Shegalov <gera@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-17 16:22:46 +09:00
Dongjoon Hyun 4c015555da [SPARK-35416][K8S] Support PersistentVolumeClaim Reuse
### What changes were proposed in this pull request?

This PR aims to add a new configuration, `spark.kubernetes.driver.reusePersistentVolumeClaim`, to reuse driver-owned `PersistentVolumeClaims` of the **deleted** executor pods.

Note also that `driver-owned PersistentVolumeClaims` is controlled by `spark.kubernetes.driver.ownPersistentVolumeClaim` which is recently added.

### Why are the changes needed?

PVC creations take some times. This feature can reduce it by reusing it.

For example, we can start `Pi` app with two executors with PVCs.
```
$ k logs -f pi | grep ExecutorPodsAllocator
21/05/16 23:36:32 INFO ExecutorPodsAllocator: Going to request 2 executors from Kubernetes for ResourceProfile Id: 0, target: 2 running: 0.
21/05/16 23:36:32 INFO ExecutorPodsAllocator: Found 0 reusable PVCs from 0 PVCs
21/05/16 23:36:32 INFO ExecutorPodsAllocator: Trying to create PersistentVolumeClaim pi-exec-1-pvc-0 with StorageClass scaleio
21/05/16 23:36:33 INFO ExecutorPodsAllocator: Trying to create PersistentVolumeClaim pi-exec-2-pvc-0 with StorageClass scaleio
```

After killing one executor, Spark is trying to look up the reusable PVCs, but the dead-executor's PVC may not returned yet because K8s works asynchronously. In this case, Spark is trying to create a new PVC as a normal operation.
```
21/05/16 23:38:51 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 2 running: 1.
21/05/16 23:38:51 INFO ExecutorPodsAllocator: Found 0 reusable PVCs from 2 PVCs
21/05/16 23:38:51 INFO ExecutorPodsAllocator: Trying to create PersistentVolumeClaim pi-exec-3-pvc-0 with StorageClass scaleio
```

After killing another executor, Spark found one reusable PVC, `pi-exec-1-pvc-0`, and reuse it.
```
21/05/16 23:39:18 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 2 running: 1.
21/05/16 23:39:18 INFO ExecutorPodsAllocator: Found 1 reusable PVCs from 3 PVCs
21/05/16 23:39:18 INFO ExecutorPodsAllocator: Reuse PersistentVolumeClaim pi-exec-1-pvc-0
```

In this case, we can easily notice the remounted PVC because `ClaimName`, `pi-exec-1-pvc-0`, doesn't have the prefix of pod name, `pi-exec-4`.
```
$ k describe pod pi-exec-4 | grep pi-exec-1-pvc-0
    ClaimName:  pi-exec-1-pvc-0
```

### Does this PR introduce _any_ user-facing change?

Yes, but this is a new feature which is disabled by the new conf.

### How was this patch tested?

Pass the CIs with the newly added test case.

K8S IT test also passed.
```
KubernetesSuite:
- Run SparkPi with no resources
- Run SparkPi with a very long application name.
- Use SparkLauncher.NO_RESOURCE
- Run SparkPi with a master URL without a scheme.
- Run SparkPi with an argument.
- Run SparkPi with custom labels, annotations, and environment variables.
- All pods have the same service account by default
- Run extraJVMOptions check on driver
- Run SparkRemoteFileTest using a remote data file
- Verify logging configuration is picked from the provided SPARK_CONF_DIR/log4j.properties
- Run SparkPi with env and mount secrets.
- Run PySpark on simple pi.py example
- Run PySpark to test a pyfiles example
- Run PySpark with memory customization
- Run in client mode.
- Start pod creation from template
- Launcher client dependencies
- SPARK-33615: Launcher client archives
- SPARK-33748: Launcher python client respecting PYSPARK_PYTHON
- SPARK-33748: Launcher python client respecting spark.pyspark.python and spark.pyspark.driver.python
- Launcher python client dependencies using a zip file
- Test basic decommissioning
- Test basic decommissioning with shuffle cleanup
- Test decommissioning with dynamic allocation & shuffle cleanups
- Test decommissioning timeouts
- Run SparkR on simple dataframe.R example
Run completed in 17 minutes, 7 seconds.
Total number of tests run: 26
Suites: completed 2, aborted 0
Tests: succeeded 26, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  24:14 min
[INFO] Finished at: 2021-05-16T17:24:40-07:00
[INFO] ------------------------------------------------------------------------
```

Closes #32564 from dongjoon-hyun/SPARK-35416.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-17 00:20:48 -07:00
Yuming Wang fb9316388a [SPARK-32792][SQL][FOLLOWUP] Fix conflict with SPARK-34661
### What changes were proposed in this pull request?

This fixes the compilation error due to the logical conflicts between https://github.com/apache/spark/pull/31776 and https://github.com/apache/spark/pull/29642 .

### Why are the changes needed?

To recover compilation.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Closes #32568 from wangyum/HOT-FIX.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-05-16 22:12:52 -07:00