Commit graph

30533 commits

Author SHA1 Message Date
gengjiaan 8013f985a4 [SPARK-35378][SQL] Eagerly execute commands in QueryExecution instead of caller sides
### What changes were proposed in this pull request?
Currently, Spark eagerly executes commands on the caller side of `QueryExecution`, which is a bit hacky as `QueryExecution` is not aware of it and leads to confusion.

For example, if you run `sql("show tables").collect()`, you will see two queries with identical query plans in the web UI.
![image](https://user-images.githubusercontent.com/3182036/121193729-a72d0480-c8a0-11eb-8b12-379019607ad5.png)
![image](https://user-images.githubusercontent.com/3182036/121193822-bc099800-c8a0-11eb-9d2a-34ab1329e2f7.png)
![image](https://user-images.githubusercontent.com/3182036/121193845-c0ce4c00-c8a0-11eb-96d0-ef604a4dfab0.png)

The first query is triggered at `Dataset.logicalPlan`, which eagerly executes the command.
The second query is triggered at `Dataset.collect`, which is the normal query execution.

From the web UI, it's hard to tell that these two queries are caused by eager command execution.

This PR proposes to move the eager command execution to `QueryExecution`, and turn the command plan to `CommandResult` to indicate that command has been executed already. Now `sql("show tables").collect()` still triggers two queries, but the quey plans are not identical. The second query becomes:
![image](https://user-images.githubusercontent.com/3182036/121194850-b3659180-c8a1-11eb-9abf-2980f84f089d.png)

In addition to the UI improvements, this PR also has other benefits:
1. Simplifies code as caller side no need to worry about eager command execution. `QueryExecution` takes care of it.
2. It helps https://github.com/apache/spark/pull/32442 , where there can be more plan nodes above commands, and we need to replace commands with something like local relation that produces unsafe rows.

### Why are the changes needed?
Explained above.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing tests

Closes #32513 from beliefer/SPARK-35378.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: beliefer <beliefer@163.com>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-09 04:45:44 +00:00
Hyukjin Kwon afff42178c [SPARK-35647][PYTHON][DOCS] Restructure User Guide in PySpark documentation
### What changes were proposed in this pull request?

This PR proposes to restructure User Guide in PySpark documentation for pandas APIs on Spark.

**Before**

![Screen Shot 2021-06-08 at 8 47 41 PM](https://user-images.githubusercontent.com/6477701/121179493-cb85e280-c89a-11eb-8b93-552ebe7cd0a8.png)

**After**

![Screen Shot 2021-06-08 at 8 46 58 PM](https://user-images.githubusercontent.com/6477701/121179419-b3ae5e80-c89a-11eb-82a0-6dabbf1de12d.png)

Note that I mostly just moved the contents around except minor changes:
- Removing some questions in FAQ that don't make sense in Apache Spark
- Rename a subtitle "Working with pandas and PySpark" to "From/to pandas and PySpark DataFrames"

For renaming Koalas to either pandas-on-Spark or pandas APIs on Spark, it will be done at SPARK-35591

### Why are the changes needed?

For better readability.

### Does this PR introduce _any_ user-facing change?

Yes, it restructures the documentation as shown above.

### How was this patch tested?

I manually built the docs and tested.

Closes #32820 from HyukjinKwon/SPARK-35647.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-09 12:13:25 +09:00
Hyukjin Kwon d12c1472f6 [SPARK-35682][PYTHON][TESTS] Pin mypy==0.812 in GitHub Actions CI
### What changes were proposed in this pull request?

Seems like the new MyPy version was released (0.901) and it broke the CI: https://github.com/python/mypy/releases.

```
python/pyspark/pandas/indexes/base.py:2007: error: Argument 1 to "from_tuples" of "MultiIndex" has incompatible type "Index"; expected "List[Tuple[Any, ...]]"
python/pyspark/testing/pandasutils.py:41: error: Library stubs not installed for "tabulate" (or incompatible with Python 3.6)
python/pyspark/testing/pandasutils.py:41: note: Hint: "python3 -m pip install types-tabulate"
python/pyspark/testing/pandasutils.py:41: note: (or run "mypy --install-types" to install all missing stub packages)
python/pyspark/testing/pandasutils.py:41: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports
Found 2 errors in 2 files (checked 312 source files)
```

I tried to fix these instances and pin it to the latest version (0.901). However, I realised that `python/pyspark/pandas/indexes/base.py:2007` has a logic issue (see https://github.com/databricks/koalas/pull/1325#discussion_r647889901 and https://github.com/databricks/koalas/pull/1325#discussion_r647890007) which cannot be fixed quickly.

Therefore, I decided to pin it to the previous version we used before for now, in order to unblock other PRs builds.

### Why are the changes needed?

To unblock other PRs.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

I tested in my local but it has to be tested and passed in GitHub Actions in this PR.

Closes #32829 from HyukjinKwon/SPARK-35682.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-09 11:05:16 +09:00
Gengliang Wang 1b1a8e4eee [SPARK-30993][FOLLOWUP][SQL] Refactor LocalDateTimeUDT as YearUDT in UserDefinedTypeSuite
### What changes were proposed in this pull request?

Refactor LocalDateTimeUDT as YearUDT in UserDefinedTypeSuite

### Why are the changes needed?

As we are going to support java.time.LocalDateTime as an external type of TimestampWithoutTZ type https://github.com/apache/spark/pull/32814, registering java.time.LocalDateTime as UDT will cause test failures: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139469/testReport/
This PR is to unblock https://github.com/apache/spark/pull/32814.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #32824 from gengliangwang/UDTFollowUp.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-09 10:02:37 +08:00
liuqi e79dd89cf6 [SPARK-35512][PYTHON] Fix OverflowError(cannot convert float infinity to integer) in partitionBy function
### What changes were proposed in this pull request?
Limit the batch size for `add_shuffle_key` in `partitionBy` function to fix `OverflowError: cannot convert float infinity to integer`

### Why are the changes needed?
It's not easy to write a UT, but I can use some simple code to explain the bug.
* Original code
```
        def add_shuffle_key(split, iterator):

            buckets = defaultdict(list)
            c, batch = 0, min(10 * numPartitions, 1000)

            for k, v in iterator:
                buckets[partitionFunc(k) % numPartitions].append((k, v))
                c += 1

                # check used memory and avg size of chunk of objects
                if (c % 1000 == 0 and get_used_memory() > limit
                        or c > batch):
                    n, size = len(buckets), 0
                    for split in list(buckets.keys()):
                        yield pack_long(split)
                        d = outputSerializer.dumps(buckets[split])
                        del buckets[split]
                        yield d
                        size += len(d)

                    avg = int(size / n) >> 20
                    # let 1M < avg < 10M
                    if avg < 1:
                        batch *= 1.5
                    elif avg > 10:
                        batch = max(int(batch / 1.5), 1)
                    c = 0
```
if `get_used_memory() > limit` always is `True` and `avg < 1` always is `True`, the variable `batch` will grow to infinity. then `batch = max(int(batch / 1.5), 1)` may raise `OverflowError` if `avg > 10` at some time.
* sample code to reproduce the bug
```
import sys

limit = 100
used_memory = 200
numPartitions = 64
c, batch = 0, min(10 * numPartitions, 1000)

while True:
    c += 1
    if (c % 1000 == 0 and used_memory > limit or c > batch):
        batch = batch * 1.5
        d = max(int(batch / 1.5), 1)
        print(c, batch)
```

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
It's not easy to write a UT, there is sample code to test
```
import sys

limit = 100
used_memory = 200
numPartitions = 64
c, batch = 0, min(10 * numPartitions, 1000)

while True:
    c += 1
    if (c % 1000 == 0 and used_memory > limit or c > batch):
        batch = min(sys.maxsize, batch * 1.5)
        d = max(int(batch / 1.5), 1)
        print(c, batch)
```

Closes #32667 from nolanliou/fix_partitionby.

Authored-by: liuqi <nolan.liou@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-09 10:57:27 +09:00
Kousuke Saruta 93a9dc479c [SPARK-35602][SS] Update state schema to be able to accept long length JSON
### What changes were proposed in this pull request?

This PR fixes an issue that both key and value of state schema cannot accept long length (>65535 bytes) JSON.
To solve the problem explained below, JSON represented schema is divided into chunks whose maximum length is 65535 bytes, and each chunk is written by `DataOutputStream.writeUTF`.

As the solution changes the format of the schema, the version is also changes from `1` to `2` but old version schema is still acceptable to ensures backward compatibility.

### Why are the changes needed?

In the current implementation, writing state schema fails if the length of schema exceeds 65535 bytes and `UTFDataFormatException` is thrown.
It's due to the limitation of `DataOutputStream.writeUTF`.
`writeUTF` writes a length field first and it's 2 bytes width, meaning the maximum content length is limited to `2^16-1`=`65535` bytes.
https://docs.oracle.com/javase/8/docs/api/java/io/DataOutputStream.html#writeUTF-java.lang.String-

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

Closes #32788 from sarutak/fix-UTFDataFormatException.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-06-09 10:09:57 +09:00
Chao Sun 66e38f48fe [SPARK-35384][SQL][FOLLOWUP] Fix Scala doc for removed method parameters
### What changes were proposed in this pull request?

Fix Scala doc for removed parameters for `InvokeLike.invoke`.

### Why are the changes needed?

#32532 forgot to update the Scala doc after removing 2 parameters for `InvokeLike.invoke`. This fixes it.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

N/A

Closes #32827 from sunchao/SPARK-35384-followup.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-08 15:52:10 -07:00
Liang-Chi Hsieh 1226b9badd [SPARK-35659][SS] Avoid write null to StateStore
### What changes were proposed in this pull request?

This patch removes the usage of putting null into StateStore.

### Why are the changes needed?

According to `get` method doc in `StateStore` API, it returns non-null row if the key exists. So basically we should avoid write null to `StateStore`. You cannot distinguish if the returned null row is because the key doesn't exist, or the value is actually null. And due to the defined behavior of `get`, it is quite easy to cause NPE error if the caller doesn't expect to get a null if the caller believes the key exists.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added test.

Closes #32796 from viirya/fix-ss-joinstatemanager.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-06-08 09:10:19 -07:00
Satish Gopalani 2a331177ba [SPARK-35312][SS] Introduce new Option in Kafka source to specify minimum number of records to read per trigger
### What changes were proposed in this pull request?
This patch introduces a new option to specify the minimum number of offsets to read per trigger i.e. minOffsetsPerTrigger and maxTriggerDelay to avoid the infinite wait for the trigger.

This new option will allow skipping trigger/batch when the number of records available in Kafka is low. This is a very useful feature in cases where we have a sudden burst of data at certain intervals in a day and data volume is low for the rest of the day.
'maxTriggerDelay' option will help to avoid cases of infinite delay in scheduling trigger and the trigger will happen irrespective of records available if the maxTriggerDelay time exceeds the last trigger. It would be an optional parameter with a default value of 15 mins. This option will be only applicable if minOffsetsPerTrigger is set.

minOffsetsPerTrigger option would be optional of course, but once specified it would take precedence over maxOffestsPerTrigger which will be honored only after minOffsetsPerTrigger is satisfied.

### Why are the changes needed?
There are many scenarios where there is a sudden burst of data at certain intervals in a day and data volume is low for the rest of the day. Tunning such jobs is difficult as decreasing trigger processing time increasing the number of batches and hence cluster resource usage and adds to small file issues. Increasing trigger processing time adds consumer lag. This patch tries to address this issue.

### How was this patch tested?
This patch was tested by adding test cases as well as manually on a cluster where the job was running for a full one day with a data burst happening once a day.
Here is the picture of databurst and hence consumer lag:
<img width="1198" alt="Screenshot 2021-04-29 at 11 39 35 PM" src="https://user-images.githubusercontent.com/1044003/116997587-9b2ab180-acfa-11eb-91fd-524802ce3316.png">

This is how the job behaved at burst time running every 4.5 mins (which is the specified trigger time):
<img width="1154" alt="Burst Time" src="https://user-images.githubusercontent.com/1044003/116997919-12f8dc00-acfb-11eb-9b0a-98387fc67560.png">

This is job behavior during the non-burst time where it is skipping 2 to 3 triggers and running once every 9 to 13.5 mins
<img width="1154" alt="Non Burst Time" src="https://user-images.githubusercontent.com/1044003/116998244-8b5f9d00-acfb-11eb-8340-33d47149ef81.png">

Here are some more stats from the two-run i.e. one normal run and the other with minOffsetsperTrigger set:

| Run | Data Size | Number of Batch Runs | Number of Files |
| ------------- | ------------- |------------- |------------- |
| Normal Run | 54.2 GB | 320 | 21968 |
| Run with minOffsetsperTrigger | 54.2 GB | 120 | 12104 |

Closes #32653 from satishgopalani/SPARK-35312.

Authored-by: Satish Gopalani <satish.gopalani@pubmatic.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-06-08 23:48:09 +09:00
Hyukjin Kwon 921abc51cf [SPARK-35636][PYTHON][DOCS][FOLLOW-UP] Restructure reference API files according to the layout
### What changes were proposed in this pull request?

This PR proposes to restructure API files according to the layout, see https://github.com/apache/spark/pull/32799. Now the pandas APIs on Spark are under a separate directory which is same level as other modules such as Spark SQL.

```bash
tree reference
```

**Before:**

```
reference
├── index.rst
├── ps_extensions.rst
├── ps_frame.rst
├── ps_general_functions.rst
├── ps_groupby.rst
├── ps_indexing.rst
├── ps_io.rst
├── ps_ml.rst
├── ps_series.rst
├── ps_window.rst
├── pyspark.ml.rst
├── pyspark.mllib.rst
├── pyspark.pandas.rst
├── pyspark.resource.rst
├── pyspark.rst
├── pyspark.sql.rst
├── pyspark.ss.rst
└── pyspark.streaming.rst
```

**After:**

```
reference
├── index.rst
├── pyspark.ml.rst
├── pyspark.mllib.rst
├── pyspark.pandas
│   ├── extensions.rst
│   ├── frame.rst
│   ├── general_functions.rst
│   ├── groupby.rst
│   ├── index.rst
│   ├── indexing.rst
│   ├── io.rst
│   ├── ml.rst
│   ├── series.rst
│   └── window.rst
├── pyspark.resource.rst
├── pyspark.rst
├── pyspark.sql.rst
├── pyspark.ss.rst
└── pyspark.streaming.rst
```

### Why are the changes needed?

To make the directory structure easier to follow.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually built and tested the docs.

Closes #32812 from HyukjinKwon/SPARK-35646-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-08 19:01:56 +09:00
Cheng Pan eee02739ed [SPARK-34290][SQL][FOLLOWUP] Cleanup truncate table not supported for V2Table error
### What changes were proposed in this pull request?

Cleanup unreachable code.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existed test.

Closes #32791 from pan3793/cleanup.

Authored-by: Cheng Pan <379377944@qq.com>
Signed-off-by: Kent Yao <yao@apache.org>
2021-06-08 13:24:11 +08:00
Yikun Jiang f3dc549d9c [SPARK-35668][INFRA] Use "concurrency" syntax on Github Actions workflow
### What changes were proposed in this pull request?

This patch uses the "concurrency" syntax to replace the "cancel job" workflow:
- .github/workflows/benchmark.yml
- .github/workflows/labeler.yml
- .github/workflows/notify_test_workflow.yml
- .github/workflows/test_report.yml

Remove the .github/workflows/cancel_duplicate_workflow_runs.yml

Note that the push/schedule based job are not changed to keep the same config in a4b70758d3:
- .github/workflows/build_and_test.yml
- .github/workflows/publish_snapshot.yml
- .github/workflows/stale.yml
- .github/workflows/update_build_status.yml

### Why are the changes needed?
We are using [cancel_duplicate_workflow_runs](a70e66ecfa/.github/workflows/cancel_duplicate_workflow_runs.yml (L1)) job to cancel previous jobs when a new job is queued. Now, it has been supported by the github action by using ["concurrency"](https://docs.github.com/en/actions/reference/workflow-syntax-for-github-actions#concurrency) syntax to make sure only a single job or workflow using the same concurrency group.

Related: https://github.com/apache/arrow/pull/10416 and https://github.com/potiuk/cancel-workflow-runs

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
triger the PR manaully

Closes #32806 from Yikun/SPARK-X.

Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-08 12:10:40 +09:00
itholic 745756ca4c [SPARK-35603][R][DOCS] Add data source options link for R API documentation
### What changes were proposed in this pull request?

There are options for data source are documented at Data Source Options page for every data sources.

For Python, Scala, JAVA, the link for Data Source Option page was added in each API documentation.

- Python
<img width="732" alt="Screen Shot 2021-06-07 at 12 25 45 PM" src="https://user-images.githubusercontent.com/44108233/120955187-cbe38800-c78b-11eb-9475-ccf89bbc3c95.png">

- Scala
<img width="677" alt="Screen Shot 2021-06-07 at 12 26 41 PM" src="https://user-images.githubusercontent.com/44108233/120955186-cab25b00-c78b-11eb-9fed-3f0d2024029b.png">

- JAVA
<img width="726" alt="Screen Shot 2021-06-07 at 12 27 49 PM" src="https://user-images.githubusercontent.com/44108233/120955182-c8e89780-c78b-11eb-9cf1-13e41ba35b3e.png">

However, we have no link for R documentation, so we should add the link to the R documentation as well.

### Why are the changes needed?

To provide users available options for each data source when they read/write it.

### Does this PR introduce _any_ user-facing change?

Yes, the link for Data Source Option is added to R documentation as below.

<img width="855" alt="Screen Shot 2021-06-07 at 12 29 26 PM" src="https://user-images.githubusercontent.com/44108233/120955302-064d2500-c78c-11eb-8dc3-cb22dfd5fd14.png">

### How was this patch tested?

Manually built doc and checked one by one

Closes #32797 from itholic/SPARK-35603.

Lead-authored-by: itholic <haejoon.lee@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-08 11:58:38 +09:00
Takuya UESHIN 04418e18d7 [SPARK-35638][PYTHON] Introduce InternalField to manage dtypes and StructFields
### What changes were proposed in this pull request?

Introduces `InternalField` to manage dtypes and `StructField`s.

`InternalFrame` is already managing dtypes, but when it checks the Spark's data types, column names, and nullabilities, it tries to run the analysis phase each time it needs, which will cause a performance issue.

It will use `InternalField` class which stores the retrieved Spark's data types, column names, and nullabilities, and reuse them. Also, in case those can be known, just update and reuse them without asking Spark.

### Why are the changes needed?

Currently there are some performance issues in the pandas-on-Spark layer.

One of them is accessing Java DataFrame and run analysis phase too many times, especially just for retrieving the current column names or data types.

We should reduce the amount of unnecessary access.

### Does this PR introduce _any_ user-facing change?

Improves the performance in pandas-on-Spark layer:

```py
df = ps.read_parquet("/path/to/test.parquet")  # contains ~75 columns
df = df[(df["col"] > 0) & (df["col"] < 10000)]
```

Before the PR, it took about **2.15 sec** and after **1.15 sec**.

### How was this patch tested?

Existing tests.

Closes #32775 from ueshin/issues/SPARK-35638/field.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-08 11:57:28 +09:00
Xinrong Meng dfd8a8dc67 [SPARK-35341][PYTHON] Introduce BooleanExtensionOps
### What changes were proposed in this pull request?

- Introduce BooleanExtensionOps in order to make boolean operators `and` and `or` data-type-based.
- Improve error messages for operators `and` and `or`.

### Why are the changes needed?

Boolean operators __and__, __or__, __rand__, and __ror__ should be data-type-based

BooleanExtensionDtypes processes these boolean operators differently from bool, so BooleanExtensionOps is introduced.

These boolean operators themselves are also bitwise operators, which should be able to apply to other data types classes later. However, this is not the goal of this PR.

### Does this PR introduce _any_ user-facing change?

Yes. Error messages for operators `and` and `or` are improved.
Before:
```
>>> psser = ps.Series([1, "x", "y"], dtype="category")
>>> psser | True
Traceback (most recent call last):
...
pyspark.sql.utils.AnalysisException: cannot resolve '(`0` OR true)' due to data type mismatch: differing types in '(`0` OR true)' (tinyint and boolean).;
'Project [unresolvedalias(CASE WHEN (isnull(0#9) OR isnull((0#9 OR true))) THEN false ELSE (0#9 OR true) END, Some(org.apache.spark.sql.Column$$Lambda$1442/17254916406fb8afba))]
+- Project [__index_level_0__#8L, 0#9, monotonically_increasing_id() AS __natural_order__#12L]
   +- LogicalRDD [__index_level_0__#8L, 0#9], false

```

After:
```
>>> psser = ps.Series([1, "x", "y"], dtype="category")
>>> psser | True
Traceback (most recent call last):
...
TypeError: Bitwise or can not be applied to categoricals.
```

### How was this patch tested?

Unit tests.

Closes #32698 from xinrong-databricks/datatypeops_extension.

Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
2021-06-07 15:43:52 -07:00
Xinrong Meng 04a8d2cbcf [SPARK-35343][PYTHON] Make the conversion from/to pandas data-type-based for non-ExtensionDtypes
### What changes were proposed in this pull request?

Make the conversion from/to pandas (for non-ExtensionDtype) data-type-based.
NOTE: Ops class per ExtensionDtype and its data-type-based from/to pandas will be implemented in a separate PR as https://issues.apache.org/jira/browse/SPARK-35614.

### Why are the changes needed?

The conversion from/to pandas includes logic for checking data types and behaving accordingly.
That makes code hard to change or maintain.
Since we have introduced the Ops class per non-ExtensionDtype data type, we ought to make the conversion from/to pandas data-type-based for non-ExtensionDtypes.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests.

Closes #32592 from xinrong-databricks/datatypeop_pd_conversion.

Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
2021-06-07 13:12:12 -07:00
dgd-contributor 6c3b7f92cf [SPARK-35074][CORE] hardcoded configs move to config package
### What changes were proposed in this pull request?
Currently spark.jars.xxx property keys (e.g. spark.jars.ivySettings and spark.jars.packages) are hardcoded in multiple places within Spark code across multiple modules. We should define them in config/package.scala and reference them in all other places.

### Why are the changes needed?
improvement

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
no

Closes #32746 from dgd-contributor/SPARK-35074_configs_should_be_moved_to_config_package.scala.

Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Thomas Graves <tgraves@apache.org>
2021-06-07 09:55:03 -05:00
Gengliang Wang 33f26275f4 [SPARK-35663][SQL] Add Timestamp without time zone type
### What changes were proposed in this pull request?

Extend Catalyst's type system by a new type that conforms to the SQL standard (see SQL:2016, section 4.6.2): TimestampWithoutTZType represents the timestamp without time zone type

### Why are the changes needed?

Spark SQL today supports the TIMESTAMP data type. However the semantics provided actually match TIMESTAMP WITH LOCAL TIMEZONE as defined by Oracle. Timestamps embedded in a SQL query or passed through JDBC are presumed to be in session local timezone and cast to UTC before being processed.
These are desirable semantics in many cases, such as when dealing with calendars.
In many (more) other cases, such as when dealing with log files it is desirable that the provided timestamps not be altered.
SQL users expect that they can model either behavior and do so by using TIMESTAMP WITHOUT TIME ZONE for time zone insensitive data and TIMESTAMP WITH LOCAL TIME ZONE for time zone sensitive data.
Most traditional RDBMS map TIMESTAMP to TIMESTAMP WITHOUT TIME ZONE and will be surprised to see TIMESTAMP WITH LOCAL TIME ZONE, a feature that does not exist in the standard.

In this new feature, we will introduce TIMESTAMP WITH LOCAL TIMEZONE to describe the existing timestamp type and add TIMESTAMP WITHOUT TIME ZONE for standard semantic.
Using these two types will provide clarity.
This is a starting PR. See more details in https://issues.apache.org/jira/browse/SPARK-35662

### Does this PR introduce _any_ user-facing change?

Yes, a new data type for Timestamp without time zone type. It is still in development.

### How was this patch tested?

Unit test

Closes #32802 from gengliangwang/TimestampNTZType.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-07 14:21:31 +00:00
attilapiros 4534c0c4df [SPARK-35543][CORE] Fix memory leak in BlockManagerMasterEndpoint removeRdd
### What changes were proposed in this pull request?

In `BlockManagerMasterEndpoint` for the disk persisted RDDs (when `spark.shuffle.service.fetch.rdd.enable` is enabled) we are keeping track the block status entries by external shuffle service instances (so on YARN we are basically keeping them by nodes). This is the `blockStatusByShuffleService` member val. And when all the RDD blocks are removed for one external shuffle service instance then the key and the empty map can be removed from `blockStatusByShuffleService`.

### Why are the changes needed?

It is a small leak and I was asked to take care of it in https://github.com/apache/spark/pull/32114#discussion_r640270377.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually by adding a temporary log line to check `blockStatusByShuffleService` value before and after the `removeRdd` and run the `SPARK-25888: using external shuffle service fetching disk persisted blocks` test in `ExternalShuffleServiceSuite`.

Closes #32790 from attilapiros/SPARK-35543.

Authored-by: attilapiros <piros.attila.zsolt@gmail.com>
Signed-off-by: attilapiros <piros.attila.zsolt@gmail.com>
2021-06-07 15:37:19 +02:00
Wenchen Fan a70e66ecfa [SPARK-35665][SQL] Resolve UnresolvedAlias in CollectMetrics
### What changes were proposed in this pull request?

It's a long-standing bug that we forgot to resolve `UnresolvedAlias` in `CollectMetrics`. It's a bit hard to trigger this bug before 3.2 as most likely people won't create `UnresolvedAlias` when calling `Dataset.observe`. However things have been changed after https://github.com/apache/spark/pull/30974

This PR proposes to handle `CollectMetrics` in the rule `ResolveAliases`.

### Why are the changes needed?

bug fix

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

updated test

Closes #32803 from cloud-fan/minor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-07 21:05:11 +09:00
Hyukjin Kwon 7ce7aa4758 [SPARK-35646][PYTHON][DOCS] Relocate pandas-on-Spark API references in documentation
### What changes were proposed in this pull request?

This PR proposes to change from:

![Screen Shot 2021-06-07 at 1 40 47 PM](https://user-images.githubusercontent.com/6477701/120960027-fc302400-c795-11eb-96fb-73ac1d8277fe.png)

to:

![Screen Shot 2021-06-07 at 1 41 19 PM](https://user-images.githubusercontent.com/6477701/120960074-0fdb8a80-c796-11eb-87ec-69a30692fdfe.png)

### Why are the changes needed?

pandas APIs on Spark (pandas on Spark) is a package in PySpark in the end. So it has to be documented in the same level with other packages (e.g., Spark SQL).

### Does this PR introduce _any_ user-facing change?

Yes, it changes the structure of the docs. To end users, no as it's only in development branch.

### How was this patch tested?

Manually tested as above.

Closes #32799 from HyukjinKwon/SPARK-35646.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-07 16:37:58 +09:00
Dongjoon Hyun 6f2ffccb5e [SPARK-35660][BUILD][K8S] Upgrade kubernetes-client to 5.4.1
### What changes were proposed in this pull request?

This PR aims to upgrade kubernetes-client to 5.4.1.

### Why are the changes needed?

This will bring a few bug fixes.
- https://github.com/fabric8io/kubernetes-client/releases/tag/v5.4.1

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes #32798 from dongjoon-hyun/SPARK-35660.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-06 22:27:08 -07:00
Ruifeng Zheng ffc61c6af0 [SPARK-35619][ML] Refactor LinearRegression - make huber support virtual centering
### What changes were proposed in this pull request?
1, for huber, make it support virtual centering
2, for `LeastSquares`, it was impled in this way: optimize the linear part and then estimate the intercept. So just re-org the its agg and suite to keep in line with other aggs.

### Why are the changes needed?
for better convergence

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing and newly added testsuites

Closes #32759 from zhengruifeng/refactor_huber_agg.

Authored-by: Ruifeng Zheng <ruifengz@foxmail.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
2021-06-07 11:26:23 +08:00
Ruifeng Zheng d8e37a8e40 [SPARK-35100][ML][FOLLOWUP] AFT cleanup
### What changes were proposed in this pull request?
remove unaccessible code path

### Why are the changes needed?
for succinctness

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing testsuite

Closes #32765 from zhengruifeng/spark_35100_followup.

Authored-by: Ruifeng Zheng <ruifengz@foxmail.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
2021-06-07 11:23:26 +08:00
Xinrong Meng 50f7686de9 [SPARK-35599][PYTHON] Adjust check_exact parameter for older pd.testing
### What changes were proposed in this pull request?

Adjust the `check_exact` parameter for non-numeric columns to ensure pandas-on-Spark tests passed with all pandas versions.

### Why are the changes needed?

`pd.testing` utils are utilized in pandas-on-Spark tests.
Due to https://github.com/pandas-dev/pandas/issues/35446, `check_exact=True` for non-numeric columns doesn't work for older pd.testing utils, e.g. `assert_series_equal`.  We wanted to adjust that to ensure pandas-on-Spark tests pass for all pandas versions.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests.

Closes #32772 from xinrong-databricks/test_util.

Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-07 11:12:49 +09:00
itholic b8740a1d1e [SPARK-35499][PYTHON] Apply black to pandas API on Spark codes
### What changes were proposed in this pull request?

This PR proposes applying `black` to pandas API on Spark codes, for improving static analysis.

By executing the `./dev/reformat-python` in the spark home directory, all the code of the pandas API on Spark is fixed according to the static analysis rules.

### Why are the changes needed?

This can be reduces the cost of static analysis during development.

It has been used continuously for about a year in the Koalas project and its convenience has been proven.

### Does this PR introduce _any_ user-facing change?

No, it's dev-only.

### How was this patch tested?

Manually reformat the pandas API on Spark codes by running the `./dev/reformat-python`, and checked the `./dev/lint-python` is passed.

Closes #32779 from itholic/SPARK-35499.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-06-06 17:30:07 -07:00
Dongjoon Hyun d4e32c896a [SPARK-35654][CORE] Allow ShuffleDataIO control DiskBlockManager.deleteFilesOnStop
### What changes were proposed in this pull request?

This PR aims to change `DiskBlockManager` like the following to allow `ShuffleDataIO` to decide the behavior of shuffle file deletion.
```scala
- private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean)
+ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Boolean)
```

### Why are the changes needed?

`SparkContext` creates
1. `SparkEnv` (with `BlockManager` and its `DiskBlockManager`)
2. loads `ShuffleDataIO`
3. initialize block manager.
```scala
_env = createSparkEnv(_conf, isLocal, listenerBus)

...
_shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
    _shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) =>
      _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
    }
...

_env.blockManager.initialize(_applicationId)
...
```

`DiskBlockManager` is created first at `BlockManager` constructor and we cannot change `deleteFilesOnStop` later at `ShuffleDataIO`. By switching to `var`, we can implement enhanced shuffle data management feature via `ShuffleDataIO` like https://github.com/apache/spark/pull/32730 .
```
  val diskBlockManager = {
    // Only perform cleanup if an external service is not serving our shuffle files.
    val deleteFilesOnStop =
      !externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
    new DiskBlockManager(conf, deleteFilesOnStop)
  }
```

### Does this PR introduce _any_ user-facing change?

No. This is a private class.

### How was this patch tested?

N/A

Closes #32784 from dongjoon-hyun/SPARK-35654.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-06 09:20:42 -07:00
Kousuke Saruta 5e30666010 [SPARK-35656][BUILD] Upgrade SBT to 1.5.3
### What changes were proposed in this pull request?

This PR proposes to upgrade SBT to 1.5.3.

### Why are the changes needed?

This release seems to include a bug fix for Scala 2.13.6+ and Scala 3.
https://github.com/sbt/sbt/releases/tag/v1.5.3

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

GA.

Closes #32792 from sarutak/upgrade-sbt-1.5.3.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-05 16:48:59 -07:00
Alkis Polyzotis 6f8c62047c [SPARK-35558] Optimizes for multi-quantile retrieval
### What changes were proposed in this pull request?
Optimizes the retrieval of approximate quantiles for an array of percentiles.
* Adds an overload for QuantileSummaries.query that accepts an array of percentiles and optimizes the computation to do a single pass over the sketch and avoid redundant computation.
* Modifies the ApproximatePercentiles operator to call into the new method.

All formatting changes are the result of running ./dev/scalafmt

### Why are the changes needed?
The existing implementation does repeated calls per input percentile resulting in redundant computation.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests for the new method.

Closes #32700 from alkispoly-db/spark_35558_approx_quants_array.

Authored-by: Alkis Polyzotis <alkis.polyzotis@databricks.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-06-05 14:25:33 -05:00
Kousuke Saruta 510bde460a [SPARK-35655][BUILD] Upgrade HtmlUnit and its related artifacts to 2.50
### What changes were proposed in this pull request?

This PR upgrades HtmlUnit and its related artifacts to `2.50`.

### Why are the changes needed?

We currently uses 2.40 but 2.41+ seem to include bunch of bug fixes and improvements especially for JavaScript and CSS.
https://htmlunit.sourceforge.io/changes-report.html#a2.50.0

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

All the `UISeleniumSuite` which use HtmlUnit pass on my laptop.

Closes #32789 from sarutak/upgrade-htmlunit250.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-05 17:45:44 +08:00
Yingyi Bu 7bc364beed [SPARK-35621][SQL] Add rule id pruning to the TypeCoercion rule
### What changes were proposed in this pull request?

- Added TreeNode.transformUpWithBeforeAndAfterRuleOnChildren(...);
- Call transformUpWithBeforeAndAfterRuleOnChildren in TypeCoercionRule.

### Why are the changes needed?

Reduce the number of tree traversals and hence improve the query compilation latency.

### How was this patch tested?

Existing tests.
Performance diff :
<google-sheets-html-origin><style type="text/css"></style>
&nbsp; | Baseline | Experiment (wo. ruleId) | Experiment (wo. ruleId)/Baseline | Experiment (w. ruleId) | Experiment (w. ruleId)/Baseline
-- | -- | -- | -- | -- | --
CombinedTypeCoercionRule | 665020354 | 567320034 | 0.85 | 330798240 | 0.50

</google-sheets-html-origin>

Closes #32761 from sigmod/transform.

Authored-by: Yingyi Bu <yingyi.bu@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-05 14:49:16 +08:00
Marios Meimaris b5678bee1e [SPARK-35446] Override getJDBCType in MySQLDialect to map FloatType to FLOAT
### What changes were proposed in this pull request?

Override `getJDBCType` method in `MySQLDialect` so that `FloatType` is mapped to `FLOAT` instead of `REAL`

### Why are the changes needed?

MySQL treats `REAL` as a synonym to `DOUBLE` by default (see https://dev.mysql.com/doc/refman/8.0/en/numeric-types.html). Therefore, when creating a table with a column of `REAL` type, it will be created as `DOUBLE`. However, currently, `MySQLDialect` does not provide an implementation for `getJDBCType`, and will thus ultimately fall back to `JdbcUtils.getCommonJDBCType`, which maps `FloatType` to `REAL`. This change is needed so that we can properly map the `FloatType` to `FLOAT` for MySQL.

### Does this PR introduce _any_ user-facing change?
Prior to this PR, when writing a dataframe with a `FloatType` column to a MySQL table, it will create a `DOUBLE` column. After the PR, it will create a `FLOAT` column.

### How was this patch tested?
Added a test case in `JDBCSuite` that verifies the mapping.

Closes #32605 from mariosmeim-db/SPARK-35446.

Authored-by: Marios Meimaris <marios.meimaris@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-05 12:44:16 +09:00
Keerthan Vasist f2c0a049a6 [SPARK-35643][PYTHON] Fix ambiguous reference in functions.py column()
### What changes were proposed in this pull request?
In functions.py, there is a function added `def column(col)`. There is also another method in the same file `def col(col)`. This leads to some ambiguity on whether the parameter is being referred to or the function. In pyspark 3.1.2, this leads to `TypeError: 'str' object is not callable` when the function `column(col)` is called - the highest preference is given to the string variable in scope as opposed to the function `col `in the file as intended.

This PR fixes that ambiguity by changing the variable name to `col_like`. I have filed this as an issue on JIRA here - https://issues.apache.org/jira/browse/SPARK-35643.

### Why are the changes needed?
In pyspark 3.1.2, we see `TypeError: 'str' object is not callable` when `column()` function is called. This Pr fixes that error.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
I don't believe this patch needs additional testing.

Closes #32771 from keerthanvasist/col.

Lead-authored-by: Keerthan Vasist <kvasist@amazon.com>
Co-authored-by: keerthanvasist <kvasist@amazon.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-05 12:40:39 +09:00
Chris Wu 497c80a1ad [SPARK-32975][K8S] Add config for driver readiness timeout before executors start
### What changes were proposed in this pull request?
Add a new config that controls the timeout of waiting for driver pod's readiness before allocating executor pods. This wait only happens once on application start.

### Why are the changes needed?
The driver's headless service can be resolved by DNS only after the driver pod is ready. If the executor tries to connect to the headless service before driver pod is ready, it will hit UnkownHostException and get into error state but will not be restarted. **This case usually happens when the driver pod has sidecar containers but hasn't finished their creation when executors start.** So basically there is a race condition. This issue can be mitigated by tweaking this config.

### Does this PR introduce _any_ user-facing change?
A new config `spark.kubernetes.allocation.driver.readinessTimeout` added.

### How was this patch tested?
Exisiting tests.

Closes #32752 from cchriswu/SPARK-32975-fix.

Lead-authored-by: Chris Wu <wucaowei19@gmail.com>
Co-authored-by: Chris Wu <wcaowei@vmware.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-04 06:59:49 -07:00
Kent Yao dc3317fdf9 [SPARK-21957][SQL][FOLLOWUP] Support CURRENT_USER without tailing parentheses
### What changes were proposed in this pull request?

A followup for 345d35ed1a, in this PR we support CURRENT_USER without tailing parentheses in default mode. And for ANSI mode, we can only use CURRENT_USER without tailing parentheses because it is a reserved keyword that cannot be used as a function name

### Why are the changes needed?

1. make it the same as current_date/current_timestamp
2. better ANSI compliance
### Does this PR introduce _any_ user-facing change?

no, just a followup

### How was this patch tested?

new tests

Closes #32770 from yaooqinn/SPARK-21957-F.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-04 13:32:56 +00:00
Ke Jia 6ce5f2491c [SPARK-35568][SQL] Add the BroadcastExchange after re-optimizing the physical plan to fix the UnsupportedOperationException when enabling both AQE and DPP
### What changes were proposed in this pull request?
This PR is to fix the `UnsupportedOperationException` described in [PR#32705](https://github.com/apache/spark/pull/32705).
When AQE and DPP are turned on at the same time, because the `BroadcastExchange` included in the DPP filter is not added through `EnsureRequirement` rule, Therefore, when AQE optimizes the DPP filter, there is no way to add `BroadcastExchange` through the `EnsureRequirement` rule in `reOptimize` method, which eventually leads to the loss of `BroadcastExchange` in the final physical plan. This PR adds `BroadcastExchange` node in the `reOptimize` method if the current plan is DPP filter.
### Why are the changes needed?
bug fix

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
adding new ut

Closes #32741 from JkSelf/fixDPP+AQEbug.

Authored-by: Ke Jia <ke.a.jia@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-04 13:29:36 +00:00
ulysses-you c7fb0e18be [SPARK-35629][SQL] Use better exception type if database doesn't exist on drop database
### What changes were proposed in this pull request?

Add database if exists check in `SeesionCatalog`

### Why are the changes needed?

Curently execute `drop database test` will throw unfriendly error msg.

```
Error in query: org.apache.hadoop.hive.metastore.api.NoSuchObjectException: test
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.metastore.api.NoSuchObjectException: test
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:112)
	at org.apache.spark.sql.hive.HiveExternalCatalog.dropDatabase(HiveExternalCatalog.scala:200)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.dropDatabase(ExternalCatalogWithListener.scala:53)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.dropDatabase(SessionCatalog.scala:273)
	at org.apache.spark.sql.execution.command.DropDatabaseCommand.run(ddl.scala:111)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3707)
```

### Does this PR introduce _any_ user-facing change?

Yes, more cleaner error msg.

### How was this patch tested?

Add test.

Closes #32768 from ulysses-you/SPARK-35629.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-06-04 15:52:21 +08:00
Karen Feng 53a758b51b [SPARK-35636][SQL] Lambda keys should not be referenced outside of the lambda function
### What changes were proposed in this pull request?

Sets `references` for `NamedLambdaVariable` and `LambdaFunction`.

| Expression  | NamedLambdaVariable | LambdaFunction |
| --- | --- | --- |
| References before | None | All function references |
| References after | self.toAttribute | Function references minus arguments' references |

In `NestedColumnAliasing`, this means that `ExtractValue(ExtractValue(attr, lv: NamedLambdaVariable), ...)` now references both `attr` and `lv`, rather than just `attr`. As a result, it will not be included in the nested column references.

### Why are the changes needed?

Before, lambda key was referenced outside of lambda function.

#### Example 1

Before:
```
Project [transform(keys#0, lambdafunction(_extract_v1#0, lambda key#0, false)) AS a#0]
+- 'Join Cross
   :- Project [kvs#0[lambda key#0].v1 AS _extract_v1#0]
   :  +- LocalRelation <empty>, [kvs#0]
   +- LocalRelation <empty>, [keys#0]
```

After:
```
Project [transform(keys#418, lambdafunction(kvs#417[lambda key#420].v1, lambda key#420, false)) AS a#419]
+- Join Cross
   :- LocalRelation <empty>, [kvs#417]
   +- LocalRelation <empty>, [keys#418]
```

#### Example 2

Before:
```
Project [transform(keys#0, lambdafunction(kvs#0[lambda key#0].v1, lambda key#0, false)) AS a#0]
+- GlobalLimit 5
  +- LocalLimit 5
    +- Project [keys#0, _extract_v1#0 AS _extract_v1#0]
      +- GlobalLimit 5
        +- LocalLimit 5
          +- Project [kvs#0[lambda key#0].v1 AS _extract_v1#0, keys#0]
            +- LocalRelation <empty>, [kvs#0, keys#0]
```

After:
```
Project [transform(keys#428, lambdafunction(kvs#427[lambda key#430].v1, lambda key#430, false)) AS a#429]
+- GlobalLimit 5
  +- LocalLimit 5
    +- Project [keys#428, kvs#427]
      +- GlobalLimit 5
        +- LocalLimit 5
          +- LocalRelation <empty>, [kvs#427, keys#428]
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Scala unit tests for the examples above

Closes #32773 from karenfeng/SPARK-35636.

Authored-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-04 15:44:32 +09:00
Hyukjin Kwon 807b4006ca [SPARK-35648][PYTHON] Refine and add dependencies needed for dev in dev/requirement.txt
### What changes were proposed in this pull request?

This PR proposes to update `dev/requirement.txt` file.

NOTE that:
- This file isn't used anywhere in Apache Spark CI. It's just for convenience
- To minimize the overhead of maintenance, I removed all lowerbounds of dependencies, which means that using the latest versions of them should work in the clean environment (e.g., you can reinstall all of them).

### Why are the changes needed?

To note the dependencies needed for Spark dev, and for easier env setting up.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

Logically derived from setup.py, and other places like CI

Closes #32780 from HyukjinKwon/SPARK-35648.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-04 15:02:29 +09:00
itholic 7c32415669 [SPARK-35523] Fix the default value in Data Source Options page
### What changes were proposed in this pull request?

This PR proposes fix the default value in Data Source Option page based on the Scala documentation.

### Why are the changes needed?

Some of the existing default value in Data Source Option page follow the Python documentation, which has `None` as the default value for all options.

### Does this PR introduce _any_ user-facing change?

Yes, the default value in the Data Source Option page is fixed (from `None` to proper default value)

- Before
<img width="361" alt="Screen Shot 2021-06-02 at 6 31 12 PM" src="https://user-images.githubusercontent.com/44108233/120456594-b8719f00-c3d0-11eb-9778-071ab2ba9f45.png">

- After
<img width="562" alt="Screen Shot 2021-06-02 at 6 32 47 PM" src="https://user-images.githubusercontent.com/44108233/120456844-f1117880-c3d0-11eb-9c7c-9dcd66776444.png">

### How was this patch tested?

Manually built the docs and checked one by one.

Closes #32745 from itholic/SPARK-35523.

Lead-authored-by: itholic <haejoon.lee@databricks.com>
Co-authored-by: Haejoon Lee <44108233+itholic@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-04 14:08:13 +09:00
Wenchen Fan 63ab38f917 [SPARK-35396][CORE][FOLLOWUP] Free memory entry immediately
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/32534 , and proposes to free the memory entry immediately instead of doing it in the backround asynchronously. The reason is:
1. It's a bit weird to free the resource in an asynchronous way.
2. We free the off-heap memory entry in the same thread, and it's better to be consistent with it.
3. We can simplify the code quite a bit.

This PR also simplifies the tests to reuse the class definition.

### Why are the changes needed?

code simplification

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing tests

Closes #32743 from cloud-fan/follow.

Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-03 21:54:27 -07:00
Takuya UESHIN 221553c204 [SPARK-35642][INFRA] Split pyspark-pandas tests to rebalance the test duration
### What changes were proposed in this pull request?

Splits some tests in `pyspark-pandas` module as slot tests to rebalance the test duration.

Picked the top 12 tests from the previous runs and the total times are almost even.

### Why are the changes needed?

Currently `pyspark-pandas` module tests take long time, so we should rebalance the tests.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #32778 from ueshin/issues/SPARK-35642/split-pandas-on-spark-tests.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-04 12:52:52 +09:00
Hyukjin Kwon 3d158f9c91 [SPARK-35587][PYTHON][DOCS] Initial porting of Koalas documentation
### What changes were proposed in this pull request?

This PR proposes to port Koalas documentation to PySpark documentation as its initial step.
It ports almost as is except these differences:

- Renamed import from `databricks.koalas` to `pyspark.pandas`.
- Renamed `to_koalas` -> `to_pandas_on_spark`
- Renamed `(Series|DataFrame).koalas` -> `(Series|DataFrame).pandas_on_spark`
- Added a `ps_` prefix in the RST file names of Koalas documentation

Other then that,

- Excluded `python/docs/build/html` in linter
- Fixed GA dependency installataion

### Why are the changes needed?

To document pandas APIs on Spark.

### Does this PR introduce _any_ user-facing change?

Yes, it adds new documentations.

### How was this patch tested?

Manually built the docs and checked the output.

Closes #32726 from HyukjinKwon/SPARK-35587.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-04 11:11:09 +09:00
Dongjoon Hyun 745bd090f7 [SPARK-35589][CORE][TESTS][FOLLOWUP] Remove the duplicated test coverage
### What changes were proposed in this pull request?

This removes the accidental duplicated test coverage.

### Why are the changes needed?

To save the test resources.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

N/A because this is a removal of the duplicated test coverage.

Closes #32774 from dongjoon-hyun/SPARK-35589-3.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-06-04 10:28:12 +09:00
Xinrong Meng 7eeb07d0f9 [SPARK-35606][PYTHON][INFRA] List Python 3.9 installed libraries in build_and_test workflow
### What changes were proposed in this pull request?

In the build_and_test workflow, tests are run against both Python 3.6 and Python 3.9. However, only libraries installed in Python 3.6 are listed. We should list Python 3.9's installed libraries as well.

### Why are the changes needed?

Listing Python 3.9's installed libraries is helpful for debugging.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual check.

Closes #32737 from xinrong-databricks/ci_py3.9lib.

Lead-authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Co-authored-by: xinrong-databricks <47337188+xinrong-databricks@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-03 14:24:53 -07:00
fornaix 878527d9fa [SPARK-35612][SQL] Support LZ4 compression in ORC data source
### What changes were proposed in this pull request?

This PR aims to support LZ4 compression in the ORC data source.

### Why are the changes needed?

Apache ORC supports LZ4 compression, but we cannot set LZ4 compression in the ORC data source

**BEFORE**

```scala
scala> spark.range(10).write.option("compression", "lz4").orc("/tmp/lz4")
java.lang.IllegalArgumentException: Codec [lz4] is not available. Available codecs are uncompressed, lzo, snappy, zlib, none, zstd.
```

**AFTER**

```scala
scala> spark.range(10).write.option("compression", "lz4").orc("/tmp/lz4")
```
```bash
$ orc-tools meta /tmp/lz4
Processing data file file:/tmp/lz4/part-00000-6a244eee-b092-4c79-a977-fb8a69dde2eb-c000.lz4.orc [length: 222]
Structure for file:/tmp/lz4/part-00000-6a244eee-b092-4c79-a977-fb8a69dde2eb-c000.lz4.orc
File Version: 0.12 with ORC_517
Rows: 10
Compression: LZ4
Compression size: 262144
Type: struct<id:bigint>

Stripe Statistics:
  Stripe 1:
    Column 0: count: 10 hasNull: false
    Column 1: count: 10 hasNull: false bytesOnDisk: 7 min: 0 max: 9 sum: 45

File Statistics:
  Column 0: count: 10 hasNull: false
  Column 1: count: 10 hasNull: false bytesOnDisk: 7 min: 0 max: 9 sum: 45

Stripes:
  Stripe: offset: 3 data: 7 rows: 10 tail: 35 index: 35
    Stream: column 0 section ROW_INDEX start: 3 length 11
    Stream: column 1 section ROW_INDEX start: 14 length 24
    Stream: column 1 section DATA start: 38 length 7
    Encoding column 0: DIRECT
    Encoding column 1: DIRECT_V2

File length: 222 bytes
Padding length: 0 bytes
Padding ratio: 0%

User Metadata:
  org.apache.spark.version=3.2.0
```

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

Pass the newly added test case.

Closes #32751 from fornaix/spark-35612.

Authored-by: fornaix <foxnaix@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-06-03 14:07:26 -07:00
Liang-Chi Hsieh 0342dcb628 [SPARK-35580][SQL] Implement canonicalized method for HigherOrderFunction
### What changes were proposed in this pull request?

This patch implements `canonicalized` method for `HigherOrderFunction`. Basically it canonicalizes the name of all `NamedLambdaVariable`s and their `ExprId`. The name and `ExprId` of `NamedLambdaVariable` are unque. But to compare semantic equality between `HigherOrderFunction`, we can canonicalize them.

### Why are the changes needed?

The default `canonicalized` method does not work for `HigherOrderFunction`. It makes subexpression elimination not work for higher functions.

Manual check gen-ed code for:
```scala
val df = Seq(Seq(1, 2, 3)).toDF("a")
df.select(transform($"a", x => x + 1), transform($"a", x => x + 1)).collect()
```

The code for `transform(input[0, array<int>, true], lambdafunction((lambda x_20#19041 + 1), lambda x_20#19041, false)),transform(input[0, array<int>, true], lambdafunction((lambda x_21#19042 + 1), lambda x_21#19042, false))`, generated by `GenerateUnsafeProjection`.

Before:

```java
/* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
...
/* 028 */   public UnsafeRow apply(InternalRow i) {
...
/* 034 */     Object obj_0 = ((Expression) references[0]).eval(i);
...
/* 062 */     Object obj_1 = ((Expression) references[1]).eval(i);
...
/* 093 */ }
```

After:
```java
/* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
...
/* 031 */   public UnsafeRow apply(InternalRow i) {
...
/* 033 */     subExpr_0(i);
...
/* 086 */   private void subExpr_0(InternalRow i) {
/* 087 */     Object obj_0 = ((Expression) references[0]).eval(i);
/* 088 */     boolean isNull_0 = obj_0 == null;
/* 089 */     ArrayData value_0 = null;
/* 090 */     if (!isNull_0) {
/* 091 */       value_0 = (ArrayData) obj_0;
/* 092 */     }
/* 093 */     subExprIsNull_0 = isNull_0;
/* 094 */     mutableStateArray_0[0] = value_0;
/* 095 */   }
/* 096 */
/* 097 */ }
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test and manual check gen-ed code.

Closes #32735 from viirya/higher-func-canonicalize.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-06-03 09:16:47 -07:00
Dongjoon Hyun 4f0db872a0 [SPARK-35416][K8S][FOLLOWUP] Use Set instead of ArrayBuffer
### What changes were proposed in this pull request?

This is a follow-up of https://github.com/apache/spark/pull/32564 .

### Why are the changes needed?

To use Set instead of ArrayBuffer and add a return type.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes #32758 from dongjoon-hyun/SPARK-35416-2.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-06-03 10:41:11 -05:00
Fu Chen cfde117c6f [SPARK-35316][SQL] UnwrapCastInBinaryComparison support In/InSet predicate
### What changes were proposed in this pull request?

This pr add in/inset predicate support for `UnwrapCastInBinaryComparison`.

Current implement doesn't pushdown filters for `In/InSet` which contains `Cast`.

For instance:

```scala
spark.range(50).selectExpr("cast(id as int) as id").write.mode("overwrite").parquet("/tmp/parquet/t1")
spark.read.parquet("/tmp/parquet/t1").where("id in (1L, 2L, 4L)").explain
```

before this pr:

```
== Physical Plan ==
*(1) Filter cast(id#5 as bigint) IN (1,2,4)
+- *(1) ColumnarToRow
   +- FileScan parquet [id#5] Batched: true, DataFilters: [cast(id#5 as bigint) IN (1,2,4)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/parquet/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
```

after this pr:

```
== Physical Plan ==
*(1) Filter id#95 IN (1,2,4)
+- *(1) ColumnarToRow
   +- FileScan parquet [id#95] Batched: true, DataFilters: [id#95 IN (1,2,4)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/parquet/t1], PartitionFilters: [], PushedFilters: [In(id, [1,2,4])], ReadSchema: struct<id:int>
```

### Does this PR introduce _any_ user-facing change?

No.
### How was this patch tested?

New test.

Closes #32488 from cfmcgrady/SPARK-35316.

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-06-03 14:45:17 +00:00
Kousuke Saruta c532f8260e [SPARK-35609][BUILD] Add style rules to prohibit to use a Guava's API which is incompatible with newer versions
### What changes were proposed in this pull request?

This PR adds rules to `checkstyle.xml` and `scalastyle-config.xml` to avoid introducing `Objects.toStringHelper` a Guava's API which is no longer present in newer Guava.

### Why are the changes needed?

SPARK-30272 (#26911) replaced `Objects.toStringHelper` which is an APIs Guava 14 provides with `commons.lang3` API because `Objects.toStringHelper` is no longer present in newer Guava.
But toStringHelper was introduced into Spark again and replaced them in SPARK-35420 (#32567).
I think it's better to have a style rule to avoid such repetition.

SPARK-30272 replaced some APIs aside from `Objects.toStringHelper` but `Objects.toStringHelper` seems to affect Spark for now so I add rules only for it.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I confirmed that `lint-java` and `lint-scala` detect the usage of `toStringHelper` and let the lint check fail.
```
$ dev/lint-java
exec: curl --silent --show-error -L https://downloads.lightbend.com/scala/2.12.14/scala-2.12.14.tgz
Using `mvn` from path: /opt/maven/3.6.3//bin/mvn
Checkstyle checks failed at following occurrences:
[ERROR] src/main/java/org/apache/spark/network/protocol/OneWayMessage.java:[78] (regexp) RegexpSinglelineJava: Avoid using Object.toStringHelper. Use ToStringBuilder instead.

$ dev/lint-scala
Scalastyle checks failed at following occurrences:
[error] /home/kou/work/oss/spark/core/src/main/scala/org/apache/spark/rdd/RDD.scala:93:25: Avoid using Object.toStringHelper. Use ToStringBuilder instead.
[error] Total time: 25 s, completed 2021/06/02 16:18:25
```

Closes #32740 from sarutak/style-rule-for-guava.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2021-06-03 21:52:41 +09:00