Commit graph

262 commits

Author SHA1 Message Date
Hyukjin Kwon 20750a3f9e [SPARK-32194][PYTHON] Use proper exception classes instead of plain Exception
### What changes were proposed in this pull request?

This PR proposes to use a proper built-in exceptions instead of the plain `Exception` in Python.

While I am here, I fixed another minor issue at `DataFrams.schema` together:

```diff
- except AttributeError as e:
-     raise Exception(
-         "Unable to parse datatype from schema. %s" % e)
+ except Exception as e:
+     raise ValueError(
+         "Unable to parse datatype from schema. %s" % e) from e
```

Now it catches all exceptions during schema parsing, chains the exception with `ValueError`. Previously it only caught `AttributeError` that does not catch all cases.

### Why are the changes needed?

For users to expect the proper exceptions.

### Does this PR introduce _any_ user-facing change?

Yeah, the exception classes became different but should be compatible because previous exception was plain `Exception` which other exceptions inherit.

### How was this patch tested?

Existing unittests should cover,

Closes #31238

Closes #32650 from HyukjinKwon/SPARK-32194.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-26 11:54:40 +09:00
Hyukjin Kwon 7eaabf4df5 [SPARK-35408][PYTHON][FOLLOW-UP] Avoid unnecessary f-string format
### What changes were proposed in this pull request?

This PR avoids using f-string format that's a new feature in Python 3.6. Although it's legitimate to use this syntax because Apache Spark supports Python 3.6+, this breaks unofficial support of Python 3.5.

This specific f-string format looks something unnecessary, and doesn't look worth enough to remove such unofficial support because of one string format in an error message.

**NOTE** that this PR doesn't mean that we're maintaining Python 3.5 since we dropped. It just looks like too much to remove that unofficial support only because of one string format and error message.

### Why are the changes needed?

To keep unofficial Python 3.5 support

### Does this PR introduce _any_ user-facing change?

Officially nope.

### How was this patch tested?

Ran the linters.

Closes #32598 from HyukjinKwon/SPARK-35408=followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-20 10:47:31 +09:00
Gera Shegalov 9eb45ecb4f [SPARK-35408][PYTHON] Improve parameter validation in DataFrame.show
### What changes were proposed in this pull request?
Provide clearer error message tied to the user's Python code if incorrect parameters are passed to `DataFrame.show` rather than the message about a missing JVM method the user is not calling directly.

```
py4j.Py4JException: Method showString([class java.lang.Boolean, class java.lang.Integer, class java.lang.Boolean]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748
```

### Why are the changes needed?
For faster debugging through actionable error message.

### Does this PR introduce _any_ user-facing change?
No change for the correct parameters but different error messages for the parameters triggering an exception.

### How was this patch tested?
- unit test
- manually in PySpark REPL

Closes #32555 from gerashegalov/df_show_validation.

Authored-by: Gera Shegalov <gera@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-05-17 16:22:46 +09:00
Yikun Jiang 44b7931936 [SPARK-35176][PYTHON] Standardize input validation error type
### What changes were proposed in this pull request?
This PR corrects some exception type when the function input params are failed to validate due to TypeError.
In order to convenient to review, there are 3 commits in this PR:
- Standardize input validation error type on sql
- Standardize input validation error type on ml
- Standardize input validation error type on pandas

### Why are the changes needed?
As suggestion from Python exception doc [1]: "Raised when an operation or function is applied to an object of inappropriate type.", but there are many Value error are raised in some pyspark code, this patch fix them.

[1] https://docs.python.org/3/library/exceptions.html#TypeError

Note that: this patch only addresses the exsiting some wrong raise type for input validation, the input validation decorator/framework which mentioned in [SPARK-35176](https://issues.apache.org/jira/browse/SPARK-35176), would be submited in a speparated patch.

### Does this PR introduce _any_ user-facing change?
Yes, code can raise the right TypeError instead of ValueError.

### How was this patch tested?
Existing test case and UT

Closes #32368 from Yikun/SPARK-35176.

Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-05-03 15:34:24 +09:00
Yikun Jiang b43f7e6a97 [SPARK-35019][PYTHON][SQL] Fix type hints mismatches in pyspark.sql.*
### What changes were proposed in this pull request?
Fix type hints mismatches in pyspark.sql.*

### Why are the changes needed?
There were some mismatches in pyspark.sql.*

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
dev/lint-python passed.

Closes #32122 from Yikun/SPARK-35019.

Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-04-13 11:21:13 +09:00
Xinrong Meng 747ad1809b [PYTHON][MINOR] Fix docstring of DataFrame.join
### What changes were proposed in this pull request?
Fix docstring of PySpark `DataFrame.join`.

### Why are the changes needed?
For a better view of PySpark documentation.

### Does this PR introduce _any_ user-facing change?
No (only documentation changes).

### How was this patch tested?
Manual test.

From
![image](https://user-images.githubusercontent.com/47337188/106977730-c14ab080-670f-11eb-8df8-5aea90902104.png)

To
![image](https://user-images.githubusercontent.com/47337188/106977834-ed663180-670f-11eb-9c5e-d09be26e0ca8.png)

Closes #31463 from xinrong-databricks/fixDoc.

Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-02-06 09:08:49 -06:00
zero323 098f2268e4 [SPARK-33730][PYTHON] Standardize warning types
### What changes were proposed in this pull request?

This PR:

- Adds as small  hierarchy of warnings to be used in PySpark applications. These extend built-in classes and top level `PySparkWarning`.
- Replaces `DeprecationWarnings` (intended for developers) with PySpark specific subclasses of `FutureWarning` (intended for end users).

### Why are the changes needed?

- To be more precise and add users additional control (in addition to standard module level filters) over PySpark warnings handling.
- Correct semantics (at the moment we use `DeprecationWarning` in user-facing API, but it is intended "for warnings about deprecated features when those warnings are intended for other Python developers").

### Does this PR introduce _any_ user-facing change?

Yes. Code can raise different type of warning than before.

### How was this patch tested?

Existing tests.

Closes #30985 from zero323/SPARK-33730.

Authored-by: zero323 <mszymkiewicz@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-01-18 09:32:55 +09:00
Josh Soref 13fd272cd3 Spelling r common dev mlib external project streaming resource managers python
### What changes were proposed in this pull request?

This PR intends to fix typos in the sub-modules:
* `R`
* `common`
* `dev`
* `mlib`
* `external`
* `project`
* `streaming`
* `resource-managers`
* `python`

Split per srowen https://github.com/apache/spark/pull/30323#issuecomment-728981618

NOTE: The misspellings have been reported at 706a726f87 (commitcomment-44064356)

### Why are the changes needed?

Misspelled words make it harder to read / understand content.

### Does this PR introduce _any_ user-facing change?

There are various fixes to documentation, etc...

### How was this patch tested?

No testing was performed

Closes #30402 from jsoref/spelling-R_common_dev_mlib_external_project_streaming_resource-managers_python.

Authored-by: Josh Soref <jsoref@users.noreply.github.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-11-27 10:22:45 -06:00
HyukjinKwon 3959f0d987 [SPARK-33250][PYTHON][DOCS] Migration to NumPy documentation style in SQL (pyspark.sql.*)
### What changes were proposed in this pull request?

This PR proposes to migrate to [NumPy documentation style](https://numpydoc.readthedocs.io/en/latest/format.html), see also SPARK-33243.
While I am migrating, I also fixed some Python type hints accordingly.

### Why are the changes needed?

For better documentation as text itself, and generated HTMLs

### Does this PR introduce _any_ user-facing change?

Yes, they will see a better format of HTMLs, and better text format. See SPARK-33243.

### How was this patch tested?

Manually tested via running `./dev/lint-python`.

Closes #30181 from HyukjinKwon/SPARK-33250.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-11-03 10:00:49 +09:00
Chuliang Xiao 81d3a8eeca [MINOR][PYTHON] Fix the typo in the docstring of method agg()
### What changes were proposed in this pull request?
Change `df.groupBy.agg()` to `df.groupBy().agg()` in the docstring of `agg()`

### Why are the changes needed?
Fix typo in a docstring

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
No

Closes #30060 from ChuliangXiao/patch-1.

Authored-by: Chuliang Xiao <ChuliangX@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-10-15 17:24:22 -07:00
zero323 7fb9f6884f [SPARK-32799][R][SQL] Add allowMissingColumns to SparkR unionByName
### What changes were proposed in this pull request?

Add optional `allowMissingColumns` argument to SparkR `unionByName`.

### Why are the changes needed?

Feature parity.

### Does this PR introduce _any_ user-facing change?

`unionByName` supports `allowMissingColumns`.

### How was this patch tested?

Existing unit tests. New unit tests targeting this feature.

Closes #29813 from zero323/SPARK-32799.

Authored-by: zero323 <mszymkiewicz@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-21 09:39:34 +09:00
Abhishek Dixit 6f36db1fa5 [SPARK-31448][PYTHON] Fix storage level used in persist() in dataframe.py
### What changes were proposed in this pull request?
Since the data is serialized on the Python side, we should make cache() in PySpark dataframes use StorageLevel.MEMORY_AND_DISK mode which has deserialized=false. This change was done to `pyspark/rdd.py` as part of SPARK-2014 but was missed from `pyspark/dataframe.py`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Using existing tests

Closes #29242 from abhishekd0907/SPARK-31448.

Authored-by: Abhishek Dixit <abhishekdixit0907@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-09-15 08:41:22 -05:00
itholic 8bd3770552 [SPARK-32798][PYTHON] Make unionByName optionally fill missing columns with nulls in PySpark
### What changes were proposed in this pull request?

This PR proposes to add new argument `allowMissingColumns` to `unionByName` for allowing users to specify whether to allow missing columns or not.

### Why are the changes needed?

To expose `allowMissingColumns` argument in Python API also. Currently this is only exposed in Scala/Java APIs.

### Does this PR introduce _any_ user-facing change?

Yes, it adds a new examples with new argument in the docstring.

### How was this patch tested?

Doctest added and manually tested

```
$ python/run-tests --testnames pyspark.sql.dataframe
Running PySpark tests. Output is in /.../spark/python/unit-tests.log
Will test against the following Python executables: ['/.../python3', 'python3.8']
Will test the following Python tests: ['pyspark.sql.dataframe']
/.../python3 python_implementation is CPython
/.../python3 version is: Python 3.8.5
python3.8 python_implementation is CPython
python3.8 version is: Python 3.8.5
Starting test(/.../python3): pyspark.sql.dataframe
Starting test(python3.8): pyspark.sql.dataframe
Finished test(python3.8): pyspark.sql.dataframe (35s)
Finished test(/.../python3): pyspark.sql.dataframe (35s)
Tests passed in 35 seconds
```

Closes #29657 from itholic/SPARK-32798.

Authored-by: itholic <haejoon309@naver.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-08 09:41:02 +09:00
Fokko Driesprong a1e459ed9f [SPARK-32719][PYTHON] Add Flake8 check missing imports
https://issues.apache.org/jira/browse/SPARK-32719

### What changes were proposed in this pull request?

Add a check to detect missing imports. This makes sure that if we use a specific class, it should be explicitly imported (not using a wildcard).

### Why are the changes needed?

To make sure that the quality of the Python code is up to standard.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing unit-tests and Flake8 static analysis

Closes #29563 from Fokko/fd-add-check-missing-imports.

Authored-by: Fokko Driesprong <fokko@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-08-31 11:23:31 +09:00
HyukjinKwon 5491c08bf1 Revert "[SPARK-31525][SQL] Return an empty list for df.head() when df is empty"
This reverts commit 44a5258ac2.
2020-07-29 12:07:35 +09:00
Tianshi Zhu 44a5258ac2 [SPARK-31525][SQL] Return an empty list for df.head() when df is empty
### What changes were proposed in this pull request?

return an empty list instead of None when calling `df.head()`

### Why are the changes needed?

`df.head()` and `df.head(1)` are inconsistent when df is empty.

### Does this PR introduce _any_ user-facing change?

Yes. If a user relies on `df.head()` to return None, things like `if df.head() is None:` will be broken.

### How was this patch tested?

Closes #29214 from tianshizz/SPARK-31525.

Authored-by: Tianshi Zhu <zhutianshirea@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-28 12:32:19 +09:00
zero323 ef3cad17a6 [SPARK-29157][SQL][PYSPARK] Add DataFrameWriterV2 to Python API
### What changes were proposed in this pull request?

- Adds `DataFramWriterV2` class.
- Adds `writeTo` method to `pyspark.sql.DataFrame`.
- Adds related SQL partitioning functions (`years`, `months`, ..., `bucket`).

### Why are the changes needed?

Feature parity.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added new unit tests.

TODO: Should we test against `org.apache.spark.sql.connector.InMemoryTableCatalog`? If so, how to expose it in Python tests?

Closes #27331 from zero323/SPARK-29157.

Authored-by: zero323 <mszymkiewicz@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-20 10:42:33 +09:00
HyukjinKwon 4ad9bfd53b [SPARK-32138] Drop Python 2.7, 3.4 and 3.5
### What changes were proposed in this pull request?

This PR aims to drop Python 2.7, 3.4 and 3.5.

Roughly speaking, it removes all the widely known Python 2 compatibility workarounds such as `sys.version` comparison, `__future__`. Also, it removes the Python 2 dedicated codes such as `ArrayConstructor` in Spark.

### Why are the changes needed?

 1. Unsupport EOL Python versions
 2. Reduce maintenance overhead and remove a bit of legacy codes and hacks for Python 2.
 3. PyPy2 has a critical bug that causes a flaky test, SPARK-28358 given my testing and investigation.
 4. Users can use Python type hints with Pandas UDFs without thinking about Python version
 5. Users can leverage one latest cloudpickle, https://github.com/apache/spark/pull/28950. With Python 3.8+ it can also leverage C pickle.

### Does this PR introduce _any_ user-facing change?

Yes, users cannot use Python 2.7, 3.4 and 3.5 in the upcoming Spark version.

### How was this patch tested?

Manually tested and also tested in Jenkins.

Closes #28957 from HyukjinKwon/SPARK-32138.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-14 11:22:44 +09:00
GuoPhilipse f0e6d0ec13 [SPARK-31710][SQL] Fail casting numeric to timestamp by default
## What changes were proposed in this pull request?
we fail casting from numeric to timestamp by default.

## Why are the changes needed?
casting from numeric to timestamp is not a  non-standard,meanwhile it may generate different result between spark and other systems,for example hive

## Does this PR introduce any user-facing change?
Yes,user cannot cast numeric to timestamp directly,user have to use the following function to achieve the same effect:TIMESTAMP_SECONDS/TIMESTAMP_MILLIS/TIMESTAMP_MICROS

## How was this patch tested?
unit test added

Closes #28593 from GuoPhilipse/31710-fix-compatibility.

Lead-authored-by: GuoPhilipse <guofei_ok@126.com>
Co-authored-by: GuoPhilipse <46367746+GuoPhilipse@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-16 08:35:35 +00:00
HyukjinKwon e1d5201140 [SPARK-31895][PYTHON][SQL] Support DataFrame.explain(extended: str) case to be consistent with Scala side
### What changes were proposed in this pull request?

Scala:

```scala
scala> spark.range(10).explain("cost")
```
```
== Optimized Logical Plan ==
Range (0, 10, step=1, splits=Some(12)), Statistics(sizeInBytes=80.0 B)

== Physical Plan ==
*(1) Range (0, 10, step=1, splits=12)
```

PySpark:

```python
>>> spark.range(10).explain("cost")
```
```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/dataframe.py", line 333, in explain
    raise TypeError(err_msg)
TypeError: extended (optional) should be provided as bool, got <class 'str'>
```

In addition, it is consistent with other codes too, for example, `DataFrame.sample` also can support `DataFrame.sample(1.0)` and `DataFrame.sample(False)`.

### Why are the changes needed?

To provide the consistent API support across APIs.

### Does this PR introduce _any_ user-facing change?

Nope, it's only changes in unreleased branches.
If this lands to master only, yes, users will be able to set `mode` as `df.explain("...")` in Spark 3.1.

After this PR:

```python
>>> spark.range(10).explain("cost")
```
```
== Optimized Logical Plan ==
Range (0, 10, step=1, splits=Some(12)), Statistics(sizeInBytes=80.0 B)

== Physical Plan ==
*(1) Range (0, 10, step=1, splits=12)
```

### How was this patch tested?

Unittest was added and manually tested as well to make sure:

```python
spark.range(10).explain(True)
spark.range(10).explain(False)
spark.range(10).explain("cost")
spark.range(10).explain(extended="cost")
spark.range(10).explain(mode="cost")
spark.range(10).explain()
spark.range(10).explain(True, "cost")
spark.range(10).explain(1.0)
```

Closes #28711 from HyukjinKwon/SPARK-31895.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-06-03 12:07:05 +09:00
iRakson 2f92ea0df4 [SPARK-31763][PYSPARK] Add inputFiles method in PySpark DataFrame Class
### What changes were proposed in this pull request?
Adds `inputFiles()` method to PySpark `DataFrame`. Using this, PySpark users can list all files constituting a `DataFrame`.

**Before changes:**

```
>>> spark.read.load("examples/src/main/resources/people.json", format="json").inputFiles()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/***/***/spark/python/pyspark/sql/dataframe.py", line 1388, in __getattr__
    "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
AttributeError: 'DataFrame' object has no attribute 'inputFiles'
```

**After changes:**

```
>>> spark.read.load("examples/src/main/resources/people.json", format="json").inputFiles()
[u'file:///***/***/spark/examples/src/main/resources/people.json']
```

### Why are the changes needed?
This method is already supported for spark with scala and java.

### Does this PR introduce _any_ user-facing change?
Yes, Now users can list all files of a DataFrame using `inputFiles()`

### How was this patch tested?
UT added.

Closes #28652 from iRakson/SPARK-31763.

Authored-by: iRakson <raksonrakesh@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-05-28 09:52:08 +09:00
David Toneian acab558e55 [SPARK-31739][PYSPARK][DOCS][MINOR] Fix docstring syntax issues and misplaced space characters
This commit is published into the public domain.

### What changes were proposed in this pull request?
Some syntax issues in docstrings have been fixed.

### Why are the changes needed?
In some places, the documentation did not render as intended, e.g. parameter documentations were not formatted as such.

### Does this PR introduce any user-facing change?
Slight improvements in documentation.

### How was this patch tested?
Manual testing and `dev/lint-python` run. No new Sphinx warnings arise due to this change.

Closes #28559 from DavidToneian/SPARK-31739.

Authored-by: David Toneian <david@toneian.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-05-18 20:25:02 +09:00
gatorsmile 3884455780 [SPARK-31087] [SQL] Add Back Multiple Removed APIs
### What changes were proposed in this pull request?

Based on the discussion in the mailing list [[Proposal] Modification to Spark's Semantic Versioning Policy](http://apache-spark-developers-list.1001551.n3.nabble.com/Proposal-Modification-to-Spark-s-Semantic-Versioning-Policy-td28938.html) , this PR is to add back the following APIs whose maintenance cost are relatively small.

- functions.toDegrees/toRadians
- functions.approxCountDistinct
- functions.monotonicallyIncreasingId
- Column.!==
- Dataset.explode
- Dataset.registerTempTable
- SQLContext.getOrCreate, setActive, clearActive, constructors

Below is the other removed APIs in the original PR, but not added back in this PR [https://issues.apache.org/jira/browse/SPARK-25908]:

- Remove some AccumulableInfo .apply() methods
- Remove non-label-specific multiclass precision/recall/fScore in favor of accuracy
- Remove unused Python StorageLevel constants
- Remove unused multiclass option in libsvm parsing
- Remove references to deprecated spark configs like spark.yarn.am.port
- Remove TaskContext.isRunningLocally
- Remove ShuffleMetrics.shuffle* methods
- Remove BaseReadWrite.context in favor of session

### Why are the changes needed?
Avoid breaking the APIs that are commonly used.

### Does this PR introduce any user-facing change?
Adding back the APIs that were removed in 3.0 branch does not introduce the user-facing changes, because Spark 3.0 has not been released.

### How was this patch tested?
Added a new test suite for these APIs.

Author: gatorsmile <gatorsmile@gmail.com>
Author: yi.wu <yi.wu@databricks.com>

Closes #27821 from gatorsmile/addAPIBackV2.
2020-03-28 22:05:16 -07:00
Eric Wu 1f0300fb16 [SPARK-30764][SQL] Improve the readability of EXPLAIN FORMATTED style
### What changes were proposed in this pull request?
The style of `EXPLAIN FORMATTED` output needs to be improved. We’ve already got some observations/ideas in
https://github.com/apache/spark/pull/27368#discussion_r376694496
https://github.com/apache/spark/pull/27368#discussion_r376927143

Observations/Ideas:
1. Using comma as the separator is not clear, especially commas are used inside the expressions too.
2. Show the column counts first? For example, `Results [4]: …`
3. Currently the attribute names are automatically generated, this need to refined.
4. Add arguments field in common implementations as `EXPLAIN EXTENDED` did by calling `argString` in `TreeNode.simpleString`. This will eliminate most existing minor differences between
`EXPLAIN EXTENDED` and `EXPLAIN FORMATTED`.
5. Another improvement we can do is: the generated alias shouldn't include attribute id. collect_set(val, 0, 0)#123 looks clearer than collect_set(val#456, 0, 0)#123

This PR is currently addressing comments 2 & 4, and open for more discussions on improving readability.

### Why are the changes needed?
The readability of `EXPLAIN FORMATTED` need to be improved, which will help user better understand the query plan.

### Does this PR introduce any user-facing change?
Yes, `EXPLAIN FORMATTED` output style changed.

### How was this patch tested?
Update expect results of test cases in explain.sql

Closes #27509 from Eric5553/ExplainFormattedRefine.

Authored-by: Eric Wu <492960551@qq.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-21 23:36:14 +08:00
Liang Zhang d8c0599e54 [SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods in Dataset
### What changes were proposed in this pull request?
This PR added two DeveloperApis to the Dataset[T] class. Both methods are just exposing lower-level methods to the Dataset[T] class.

### Why are the changes needed?
They are useful for checking whether two dataframes are the same when implementing dataframe caching in python, and also get a unique ID. It's easier to use if we wrap the lower-level APIs.

### Does this PR introduce any user-facing change?
```
scala> val df1 = Seq((1,2),(4,5)).toDF("col1", "col2")
df1: org.apache.spark.sql.DataFrame = [col1: int, col2: int]

scala> val df2 = Seq((1,2),(4,5)).toDF("col1", "col2")
df2: org.apache.spark.sql.DataFrame = [col1: int, col2: int]

scala> val df3 = Seq((0,2),(4,5)).toDF("col1", "col2")
df3: org.apache.spark.sql.DataFrame = [col1: int, col2: int]

scala> val df4 = Seq((0,2),(4,5)).toDF("col0", "col2")
df4: org.apache.spark.sql.DataFrame = [col0: int, col2: int]

scala> df1.semanticHash
res0: Int = 594427822

scala> df2.semanticHash
res1: Int = 594427822

scala> df1.sameSemantics(df2)
res2: Boolean = true

scala> df1.sameSemantics(df3)
res3: Boolean = false

scala> df3.semanticHash
res4: Int = -1592702048

scala> df4.semanticHash
res5: Int = -1592702048

scala> df4.sameSemantics(df3)
res6: Boolean = true
```

### How was this patch tested?
Unit test in scala and doctest in python.

Note: comments are copied from the corresponding lower-level APIs.
Note: There are some issues to be fixed that would improve the hash collision rate: https://github.com/apache/spark/pull/27565#discussion_r379881028

Closes #27565 from liangz1/df-same-result.

Authored-by: Liang Zhang <liang.zhang@databricks.com>
Signed-off-by: WeichenXu <weichen.xu@databricks.com>
2020-02-18 09:22:26 +08:00
HyukjinKwon a6bdea3ad4 [SPARK-30539][PYTHON][SQL] Add DataFrame.tail in PySpark
### What changes were proposed in this pull request?

https://github.com/apache/spark/pull/26809 added `Dataset.tail` API. It should be good to have it in PySpark API as well.

### Why are the changes needed?

To support consistent APIs.

### Does this PR introduce any user-facing change?

No. It adds a new API.

### How was this patch tested?

Manually tested and doctest was added.

Closes #27251 from HyukjinKwon/SPARK-30539.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-01-18 00:18:12 -08:00
jiake b389b8c5f0 [SPARK-30188][SQL] Resolve the failed unit tests when enable AQE
### What changes were proposed in this pull request?
Fix all the failed tests when enable AQE.

### Why are the changes needed?
Run more tests with AQE to catch bugs, and make it easier to enable AQE by default in the future.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Existing unit tests

Closes #26813 from JkSelf/enableAQEDefault.

Authored-by: jiake <ke.a.jia@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-01-13 22:55:19 +08:00
HyukjinKwon 92a0877ee1 [SPARK-30464][PYTHON][DOCS] Explicitly note that we don't add "pandas compatible" aliases
### What changes were proposed in this pull request?

This PR adds a note that we're not adding "pandas compatible" aliases anymore.

### Why are the changes needed?

We added "pandas compatible" aliases as of https://github.com/apache/spark/pull/5544 and https://github.com/apache/spark/pull/6066 . There are too many differences and I don't think it makes sense to add such aliases anymore at this moment.

I was even considering deprecating them out but decided to take a more conservative approache by just documenting it.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing tests should cover.

Closes #27142 from HyukjinKwon/SPARK-30464.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-01-09 11:42:52 +09:00
HyukjinKwon ee8d661058 [SPARK-30434][PYTHON][SQL] Move pandas related functionalities into 'pandas' sub-package
### What changes were proposed in this pull request?

This PR proposes to move pandas related functionalities into pandas package. Namely:

```bash
pyspark/sql/pandas
├── __init__.py
├── conversion.py  # Conversion between pandas <> PySpark DataFrames
├── functions.py   # pandas_udf
├── group_ops.py   # Grouped UDF / Cogrouped UDF + groupby.apply, groupby.cogroup.apply
├── map_ops.py     # Map Iter UDF + mapInPandas
├── serializers.py # pandas <> PyArrow serializers
├── types.py       # Type utils between pandas <> PyArrow
└── utils.py       # Version requirement checks
```

In order to separately locate `groupby.apply`, `groupby.cogroup.apply`, `mapInPandas`, `toPandas`, and `createDataFrame(pdf)` under `pandas` sub-package, I had to use a mix-in approach which Scala side uses often by `trait`, and also pandas itself uses this approach (see `IndexOpsMixin` as an example) to group related functionalities. Currently, you can think it's like Scala's self typed trait. See the structure below:

```python
class PandasMapOpsMixin(object):
    def mapInPandas(self, ...):
        ...
        return ...

    # other Pandas <> PySpark APIs
```

```python
class DataFrame(PandasMapOpsMixin):

    # other DataFrame APIs equivalent to Scala side.

```

Yes, This is a big PR but they are mostly just moving around except one case `createDataFrame` which I had to split the methods.

### Why are the changes needed?

There are pandas functionalities here and there and I myself gets lost where it was. Also, when you have to make a change commonly for all of pandas related features, it's almost impossible now.

Also, after this change, `DataFrame` and `SparkSession` become more consistent with Scala side since pandas is specific to Python, and this change separates pandas-specific APIs away from `DataFrame` or `SparkSession`.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing tests should cover. Also, I manually built the PySpark API documentation and checked.

Closes #27109 from HyukjinKwon/pandas-refactoring.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-01-09 10:22:50 +09:00
HyukjinKwon 0a2afcec7d [SPARK-30200][SQL][FOLLOW-UP] Expose only explain(mode: String) in Scala side, and clean up related codes
### What changes were proposed in this pull request?

This PR mainly targets:

1. Expose only explain(mode: String) in Scala side
2. Clean up related codes
    - Hide `ExplainMode` under private `execution` package. No particular reason but just because `ExplainUtils` exists there
    - Use `case object` + `trait` pattern in `ExplainMode` to look after `ParseMode`.
    -  Move `Dataset.toExplainString` to `QueryExecution.explainString` to look after `QueryExecution.simpleString`, and deduplicate the codes at `ExplainCommand`.
    - Use `ExplainMode` in `ExplainCommand` too.
    - Add `explainString` to `PythonSQLUtils` to avoid unexpected test failure of PySpark during refactoring Scala codes side.

### Why are the changes needed?

To minimised exposed APIs, deduplicate, and clean up.

### Does this PR introduce any user-facing change?

`Dataset.explain(mode: ExplainMode)` will be removed (which only exists in master).

### How was this patch tested?

Manually tested and existing tests should cover.

Closes #26898 from HyukjinKwon/SPARK-30200-followup.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-12-16 14:42:35 +09:00
Takeshi Yamamuro f483a13d4a [SPARK-30231][SQL][PYTHON][FOLLOWUP] Make error messages clear in PySpark df.explain
### What changes were proposed in this pull request?

This pr is a followup of #26861 to address minor comments from viirya.

### Why are the changes needed?

For better error messages.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Manually tested.

Closes #26886 from maropu/SPARK-30231-FOLLOWUP.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-12-14 14:26:50 -08:00
Takeshi Yamamuro 64c7b94d64 [SPARK-30231][SQL][PYTHON] Support explain mode in PySpark df.explain
### What changes were proposed in this pull request?

This pr intends to support explain modes implemented in #26829 for PySpark.

### Why are the changes needed?

For better debugging info. in PySpark dataframes.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added UTs.

Closes #26861 from maropu/ExplainModeInPython.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-12-13 17:44:23 +09:00
David 8e9bfea107 [SPARK-29188][PYTHON] toPandas (without Arrow) gets wrong dtypes when applied on empty DF
### What changes were proposed in this pull request?

An empty Spark DataFrame converted to a Pandas DataFrame wouldn't have the right column types. Several type mappings were missing.

### Why are the changes needed?

Empty Spark DataFrames can be used to write unit tests, and verified by converting them to Pandas first. But this can fail when the column types are wrong.

### Does this PR introduce any user-facing change?

Yes; the error reported in the JIRA issue should not happen anymore.

### How was this patch tested?

Through unit tests in `pyspark.sql.tests.test_dataframe.DataFrameTests#test_to_pandas_from_empty_dataframe`

Closes #26747 from dlindelof/SPARK-29188.

Authored-by: David <dlindelof@expediagroup.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-12-12 20:49:10 +09:00
Matt Stillwell 1e1b7302f4 [MINOR][PYSPARK][DOCS] Fix typo in example documentation
### What changes were proposed in this pull request?

I propose that we change the example code documentation to call the proper function .
For example, under the `foreachBatch` function, the example code was calling the `foreach()` function by mistake.

### Why are the changes needed?

I suppose it could confuse some people, and it is a typo

### Does this PR introduce any user-facing change?

No, there is no "meaningful" code being change, simply the documentation

### How was this patch tested?

I made the change on a fork and it still worked

Closes #26299 from mstill3/patch-1.

Authored-by: Matt Stillwell <18670089+mstill3@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-01 11:55:29 -07:00
Holden Karau 42050c3f4f [SPARK-27659][PYTHON] Allow PySpark to prefetch during toLocalIterator
### What changes were proposed in this pull request?

This PR allows Python toLocalIterator to prefetch the next partition while the first partition is being collected. The PR also adds a demo micro bench mark in the examples directory, we may wish to keep this or not.

### Why are the changes needed?

In https://issues.apache.org/jira/browse/SPARK-23961 / 5e79ae3b40 we changed PySpark to only pull one partition at a time. This is memory efficient, but if partitions take time to compute this can mean we're spending more time blocking.

### Does this PR introduce any user-facing change?

A new param is added to toLocalIterator

### How was this patch tested?

New unit test inside of `test_rdd.py` checks the time that the elements are evaluated at. Another test that the results remain the same are added to `test_dataframe.py`.

I also ran a micro benchmark in the examples directory `prefetch.py` which shows an improvement of ~40% in this specific use case.

>
> 19/08/16 17:11:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
> Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
> Running timers:
>
> [Stage 32:>                                                         (0 + 1) / 1]
> Results:
>
> Prefetch time:
>
> 100.228110831
>
>
> Regular time:
>
> 188.341721614
>
>
>

Closes #25515 from holdenk/SPARK-27659-allow-pyspark-tolocalitr-to-prefetch.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
2019-09-20 09:59:31 -07:00
Liang-Chi Hsieh 707411f479 [SPARK-28378][PYTHON] Remove usage of cgi.escape
## What changes were proposed in this pull request?

`cgi.escape` is deprecated [1], and removed at 3.8 [2]. We better to replace it.

[1] https://docs.python.org/3/library/cgi.html#cgi.escape.
[2] https://docs.python.org/3.8/whatsnew/3.8.html#api-and-feature-removals

## How was this patch tested?

Existing tests.

Closes #25142 from viirya/remove-cgi-escape.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-14 15:26:00 +09:00
HyukjinKwon cdbc30213b [SPARK-28226][PYTHON] Document Pandas UDF mapInPandas
## What changes were proposed in this pull request?

This PR proposes to document `MAP_ITER` with `mapInPandas`.

## How was this patch tested?

Manually checked the documentation.

![Screen Shot 2019-07-05 at 1 52 30 PM](https://user-images.githubusercontent.com/6477701/60698812-26cf2d80-9f2c-11e9-8295-9c00c28f5569.png)

![Screen Shot 2019-07-05 at 1 48 53 PM](https://user-images.githubusercontent.com/6477701/60698710-ac061280-9f2b-11e9-8521-a4f361207e06.png)

Closes #25025 from HyukjinKwon/SPARK-28226.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-07 09:07:52 +09:00
HyukjinKwon fe75ff8bea [SPARK-28206][PYTHON] Remove the legacy Epydoc in PySpark API documentation
## What changes were proposed in this pull request?

Seems like we used to generate PySpark API documentation by Epydoc almost at the very first place (see 85b8f2c64f).

This fixes an actual issue:

Before:

![Screen Shot 2019-07-05 at 8 20 01 PM](https://user-images.githubusercontent.com/6477701/60720491-e9879180-9f65-11e9-9562-100830a456cd.png)

After:

![Screen Shot 2019-07-05 at 8 20 05 PM](https://user-images.githubusercontent.com/6477701/60720495-ec828200-9f65-11e9-8277-8f689e292cb0.png)

It seems apparently a bug within `epytext` plugin during the conversion between`param` and `:param` syntax. See also [Epydoc syntax](http://epydoc.sourceforge.net/manual-epytext.html).

Actually, Epydoc syntax violates [PEP-257](https://www.python.org/dev/peps/pep-0257/) IIRC and blocks us to enable some rules for doctest linter as well.

We should remove this legacy away and I guess Spark 3 is good timing to do it.

## How was this patch tested?

Manually built the doc and check each.

I had to manually find the Epydoc syntax by `git grep -r "{L"`, for instance.

Closes #25060 from HyukjinKwon/SPARK-28206.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2019-07-05 10:08:22 -07:00
HyukjinKwon 5c55812400 [SPARK-28198][PYTHON][FOLLOW-UP] Rename mapPartitionsInPandas to mapInPandas with a separate evaluation type
## What changes were proposed in this pull request?

This PR proposes to rename `mapPartitionsInPandas` to `mapInPandas` with a separate evaluation type .

Had an offline discussion with rxin, mengxr and cloud-fan

The reason is basically:

1. `SCALAR_ITER` doesn't make sense with `mapPartitionsInPandas`.
2. It cannot share the same Pandas UDF, for instance, at `select` and `mapPartitionsInPandas` unlike `GROUPED_AGG` because iterator's return type is different.
3. `mapPartitionsInPandas` -> `mapInPandas` - see https://github.com/apache/spark/pull/25044#issuecomment-508298552 and https://github.com/apache/spark/pull/25044#issuecomment-508299764

Renaming `SCALAR_ITER` as `MAP_ITER` is abandoned due to 2. reason.

For `XXX_ITER`, it might have to have a different interface in the future if we happen to add other versions of them. But this is an orthogonal topic with `mapPartitionsInPandas`.

## How was this patch tested?

Existing tests should cover.

Closes #25044 from HyukjinKwon/SPARK-28198.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-05 09:22:41 +09:00
HyukjinKwon 02f4763286 [SPARK-28198][PYTHON] Add mapPartitionsInPandas to allow an iterator of DataFrames
## What changes were proposed in this pull request?

This PR proposes to add `mapPartitionsInPandas` API to DataFrame by using existing `SCALAR_ITER` as below:

1. Filtering via setting the column

```python
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

pandas_udf(df.schema, PandasUDFType.SCALAR_ITER)
def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapPartitionsInPandas(filter_func).show()
```

```
+---+---+
| id|age|
+---+---+
|  1| 21|
+---+---+
```

2. `DataFrame.loc`

```python
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd

df = spark.createDataFrame([['aa'], ['bb'], ['cc'], ['aa'], ['aa'], ['aa']], ["value"])

pandas_udf(df.schema, PandasUDFType.SCALAR_ITER)
def filter_func(iterator):
    for pdf in iterator:
        yield pdf.loc[pdf.value.str.contains('^a'), :]

df.mapPartitionsInPandas(filter_func).show()
```

```
+-----+
|value|
+-----+
|   aa|
|   aa|
|   aa|
|   aa|
+-----+
```

3. `pandas.melt`

```python
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd

df = spark.createDataFrame(
    pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                  'B': {0: 1, 1: 3, 2: 5},
                  'C': {0: 2, 1: 4, 2: 6}}))

pandas_udf("A string, variable string, value long", PandasUDFType.SCALAR_ITER)
def filter_func(iterator):
    for pdf in iterator:
        import pandas as pd
        yield pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C'])

df.mapPartitionsInPandas(filter_func).show()
```

```
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|    1|
|  a|       C|    2|
|  b|       B|    3|
|  b|       C|    4|
|  c|       B|    5|
|  c|       C|    6|
+---+--------+-----+
```

The current limitation of `SCALAR_ITER` is that it doesn't allow different length of result, which is pretty critical in practice - for instance, we cannot simply filter by using Pandas APIs but we merely just map N to N. This PR allows map N to M like flatMap.

This API mimics the way of `mapPartitions` but keeps API shape of `SCALAR_ITER` by allowing different results.

### How does this PR implement?

This PR adds mimics both `dapply` with Arrow optimization and Grouped Map Pandas UDF. At Python execution side, it reuses existing `SCALAR_ITER` code path.

Therefore, externally, we don't introduce any new type of Pandas UDF but internally we use another evaluation type code `205` (`SQL_MAP_PANDAS_ITER_UDF`).

This approach is similar with Pandas' Windows function implementation with Grouped Aggregation Pandas UDF functions - internally we have `203` (`SQL_WINDOW_AGG_PANDAS_UDF`) but externally we just share the same `GROUPED_AGG`.

## How was this patch tested?

Manually tested and unittests were added.

Closes #24997 from HyukjinKwon/scalar-udf-iter.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-02 10:54:16 +09:00
Bryan Cutler c277afb12b [SPARK-27992][PYTHON] Allow Python to join with connection thread to propagate errors
## What changes were proposed in this pull request?

Currently with `toLocalIterator()` and `toPandas()` with Arrow enabled, if the Spark job being run in the background serving thread errors, it will be caught and sent to Python through the PySpark serializer.
This is not the ideal solution because it is only catch a SparkException, it won't handle an error that occurs in the serializer, and each method has to have it's own special handling to propagate the error.

This PR instead returns the Python Server object along with the serving port and authentication info, so that it allows the Python caller to join with the serving thread. During the call to join, the serving thread Future is completed either successfully or with an exception. In the latter case, the exception will be propagated to Python through the Py4j call.

## How was this patch tested?

Existing tests

Closes #24834 from BryanCutler/pyspark-propagate-server-error-SPARK-27992.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-06-26 13:05:41 -07:00
HyukjinKwon db48da87f0 [SPARK-27834][SQL][R][PYTHON] Make separate PySpark/SparkR vectorization configurations
## What changes were proposed in this pull request?

`spark.sql.execution.arrow.enabled` was added when we add PySpark arrow optimization.
Later, in the current master, SparkR arrow optimization was added and it's controlled by the same configuration `spark.sql.execution.arrow.enabled`.

There look two issues about this:

1. `spark.sql.execution.arrow.enabled` in PySpark was added from 2.3.0 whereas SparkR optimization was added 3.0.0. The stability is different so it's problematic when we change the default value for one of both optimization first.

2. Suppose users want to share some JVM by PySpark and SparkR. They are currently forced to use the optimization for all or none if the configuration is set globally.

This PR proposes two separate configuration groups for PySpark and SparkR about Arrow optimization:

- Deprecate `spark.sql.execution.arrow.enabled`
- Add `spark.sql.execution.arrow.pyspark.enabled` (fallback to `spark.sql.execution.arrow.enabled`)
- Add `spark.sql.execution.arrow.sparkr.enabled`
- Deprecate `spark.sql.execution.arrow.fallback.enabled`
- Add `spark.sql.execution.arrow.pyspark.fallback.enabled ` (fallback to `spark.sql.execution.arrow.fallback.enabled`)

Note that `spark.sql.execution.arrow.maxRecordsPerBatch` is used within JVM side for both.
Note that `spark.sql.execution.arrow.fallback.enabled` was added due to behaviour change. We don't need it in SparkR - SparkR side has the automatic fallback.

## How was this patch tested?

Manually tested and some unittests were added.

Closes #24700 from HyukjinKwon/separate-sparkr-arrow.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-03 10:01:37 +09:00
Bryan Cutler 5e79ae3b40 [SPARK-23961][SPARK-27548][PYTHON] Fix error when toLocalIterator goes out of scope and properly raise errors from worker
## What changes were proposed in this pull request?

This fixes an error when a PySpark local iterator, for both RDD and DataFrames, goes out of scope and the connection is closed before fully consuming the iterator. The error occurs on the JVM in the serving thread, when Python closes the local socket while the JVM is writing to it. This usually happens when there is enough data to fill the socket read buffer, causing the write call to block.

Additionally, this fixes a problem when an error occurs in the Python worker and the collect job is cancelled with an exception. Previously, the Python driver was never notified of the error so the user could get a partial result (iteration until the error) and the application will continue. With this change, an error in the worker is sent to the Python iterator and is then raised.

The change here introduces a protocol for PySpark local iterators that work as follows:

1) The local socket connection is made when the iterator is created
2) When iterating, Python first sends a request for partition data as a non-zero integer
3) While the JVM local iterator over partitions has next, it triggers a job to collect the next partition
4) The JVM sends a nonzero response to indicate it has the next partition to send
5) The next partition is sent to Python and read by the PySpark deserializer
6) After sending the entire partition, an `END_OF_DATA_SECTION` is sent to Python which stops the deserializer and allows to make another request
7) When the JVM gets a request from Python but has already consumed it's local iterator, it will send a zero response to Python and both will close the socket cleanly
8) If an error occurs in the worker, a negative response is sent to Python followed by the error message. Python will then raise a RuntimeError with the message, stopping iteration.
9) When the PySpark local iterator is garbage-collected, it will read any remaining data from the current partition (this is data that has already been collected) and send a request of zero to tell the JVM to stop collection jobs and close the connection.

Steps 1, 3, 5, 6 are the same as before. Step 8 was completely missing before because errors in the worker were never communicated back to Python. The other steps add synchronization to allow for a clean closing of the socket, with a small trade-off in performance for each partition. This is mainly because the JVM does not start collecting partition data until it receives a request to do so, where before it would eagerly write all data until the socket receive buffer is full.

## How was this patch tested?

Added new unit tests for DataFrame and RDD `toLocalIterator` and tested not fully consuming the iterator. Manual tests with Python 2.7  and 3.6.

Closes #24070 from BryanCutler/pyspark-toLocalIterator-clean-stop-SPARK-23961.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-05-07 14:47:39 -07:00
Bryan Cutler d36cce18e2 [SPARK-27276][PYTHON][SQL] Increase minimum version of pyarrow to 0.12.1 and remove prior workarounds
## What changes were proposed in this pull request?

This increases the minimum support version of pyarrow to 0.12.1 and removes workarounds in pyspark to remain compatible with prior versions. This means that users will need to have at least pyarrow 0.12.1 installed and available in the cluster or an `ImportError` will be raised to indicate an upgrade is needed.

## How was this patch tested?

Existing tests using:
Python 2.7.15, pyarrow 0.12.1, pandas 0.24.2
Python 3.6.7, pyarrow 0.12.1, pandas 0.24.0

Closes #24298 from BryanCutler/arrow-bump-min-pyarrow-SPARK-27276.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-04-22 19:30:31 +09:00
Hyukjin Kwon d7dd59a6b4 [SPARK-26224][SQL][PYTHON][R][FOLLOW-UP] Add notes about many projects in withColumn at SparkR and PySpark as well
## What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/23285. This PR adds the notes into PySpark and SparkR documentation as well.

While I am here, I revised the doc a bit to make it sound a bit more neutral

## How was this patch tested?

Manually built the doc and verified.

Closes #24272 from HyukjinKwon/SPARK-26224.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-04-03 08:30:24 +09:00
Maxim Gekk 027ed2d11b [SPARK-23643][CORE][SQL][ML] Shrinking the buffer in hashSeed up to size of the seed parameter
## What changes were proposed in this pull request?

The hashSeed method allocates 64 bytes instead of 8. Other bytes are always zeros (thanks to default behavior of ByteBuffer). And they could be excluded from hash calculation because they don't differentiate inputs.

## How was this patch tested?

By running the existing tests - XORShiftRandomSuite

Closes #20793 from MaxGekk/hash-buff-size.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-03-23 11:26:09 -05:00
Dilip Biswal 7a136f8670 [SPARK-27096][SQL][FOLLOWUP] Do the correct validation of join types in R side and fix join docs for scala, python and r
## What changes were proposed in this pull request?
This is a minor follow-up PR for SPARK-27096. The original PR reconciled the join types supported between dataset and sql interface. In case of R, we do the join type validation in the R side. In this PR we do the correct validation and adds tests in R to test all the join types along with the error condition. Along with this, i made the necessary doc correction.

## How was this patch tested?
Add R tests.

Closes #24087 from dilipbiswal/joinfix_followup.

Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-03-16 13:04:54 +09:00
Hellsen83 387efe29b7 [SPARK-26449][PYTHON] Add transform method to DataFrame API
## What changes were proposed in this pull request?

Added .transform() method to Python DataFrame API to be in sync with Scala API.

## How was this patch tested?

Addition has been tested manually.

Closes #23877 from Hellsen83/pyspark-dataframe-transform.

Authored-by: Hellsen83 <erik.christiansen83@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-02-26 18:22:36 -06:00
Takuya UESHIN 4a4e7aeca7 [SPARK-26887][SQL][PYTHON][NS] Create datetime.date directly instead of creating datetime64 as intermediate data.
## What changes were proposed in this pull request?

Currently `DataFrame.toPandas()` with arrow enabled or `ArrowStreamPandasSerializer` for pandas UDF with pyarrow<0.12 creates `datetime64[ns]` type series as intermediate data and then convert to `datetime.date` series, but the intermediate `datetime64[ns]` might cause an overflow even if the date is valid.

```
>>> import datetime
>>>
>>> t = [datetime.date(2262, 4, 12), datetime.date(2263, 4, 12)]
>>>
>>> df = spark.createDataFrame(t, 'date')
>>> df.show()
+----------+
|     value|
+----------+
|2262-04-12|
|2263-04-12|
+----------+

>>>
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
>>>
>>> df.toPandas()
        value
0  1677-09-21
1  1678-09-21
```

We should avoid creating such intermediate data and create `datetime.date` series directly instead.

## How was this patch tested?

Modified some tests to include the date which overflow caused by the intermediate conversion.
Run tests with pyarrow 0.8, 0.10, 0.11, 0.12 in my local environment.

Closes #23795 from ueshin/issues/SPARK-26887/date_as_object.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-02-18 11:48:10 +08:00
Bryan Cutler ecaa495b1f [SPARK-25274][PYTHON][SQL] In toPandas with Arrow send un-ordered record batches to improve performance
## What changes were proposed in this pull request?

When executing `toPandas` with Arrow enabled, partitions that arrive in the JVM out-of-order must be buffered before they can be send to Python. This causes an excess of memory to be used in the driver JVM and increases the time it takes to complete because data must sit in the JVM waiting for preceding partitions to come in.

This change sends un-ordered partitions to Python as soon as they arrive in the JVM, followed by a list of partition indices so that Python can assemble the data in the correct order. This way, data is not buffered at the JVM and there is no waiting on particular partitions so performance will be increased.

Followup to #21546

## How was this patch tested?

Added new test with a large number of batches per partition, and test that forces a small delay in the first partition. These test that partitions are collected out-of-order and then are are put in the correct order in Python.

## Performance Tests - toPandas

Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8
measured wall clock time to execute `toPandas()` and took the average best time of 5 runs/5 loops each.

Test code
```python
df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand()).withColumn("x4", rand())
for i in range(5):
	start = time.time()
	_ = df.toPandas()
	elapsed = time.time() - start
```

Spark config
```
spark.driver.memory 5g
spark.executor.memory 5g
spark.driver.maxResultSize 2g
spark.sql.execution.arrow.enabled true
```

Current Master w/ Arrow stream | This PR
---------------------|------------
5.16207 | 4.342533
5.133671 | 4.399408
5.147513 | 4.468471
5.105243 | 4.36524
5.018685 | 4.373791

Avg Master | Avg This PR
------------------|--------------
5.1134364 | 4.3898886

Speedup of **1.164821449**

Closes #22275 from BryanCutler/arrow-toPandas-oo-batches-SPARK-25274.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2018-12-06 10:07:28 -08:00