Commit graph

2302 commits

Author SHA1 Message Date
Liang-Chi Hsieh 12e1583093 [SPARK-28927][ML] Rethrow block mismatch exception in ALS when input data is nondeterministic
### What changes were proposed in this pull request?

Fitting ALS model can be failed due to nondeterministic input data. Currently the failure is thrown by an ArrayIndexOutOfBoundsException which is not explainable for end users what is wrong in fitting.

This patch catches this exception and rethrows a more explainable one, when the input data is nondeterministic.

Because we may not exactly know the output deterministic level of RDDs produced by user code, this patch also adds a note to Scala/Python/R ALS document about the training data deterministic level.

### Why are the changes needed?

ArrayIndexOutOfBoundsException was observed during fitting ALS model. It was caused by mismatching between in/out user/item blocks during computing ratings.

If the training RDD output is nondeterministic, when fetch failure is happened, rerun part of training RDD can produce inconsistent user/item blocks.

This patch is needed to notify users ALS fitting on nondeterministic input.

### Does this PR introduce any user-facing change?

Yes. When fitting ALS model on nondeterministic input data, previously if rerun happens, users would see ArrayIndexOutOfBoundsException caused by mismatch between In/Out user/item blocks.

After this patch, a SparkException with more clear message will be thrown, and original ArrayIndexOutOfBoundsException is wrapped.

### How was this patch tested?

Tested on development cluster.

Closes #25789 from viirya/als-indeterminate-input.

Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-09-18 09:22:13 -05:00
Chris Martin 05988b256e [SPARK-27463][PYTHON] Support Dataframe Cogroup via Pandas UDFs
### What changes were proposed in this pull request?

Adds a new cogroup Pandas UDF.  This allows two grouped dataframes to be cogrouped together and apply a (pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame UDF to each cogroup.

**Example usage**

```
from pyspark.sql.functions import pandas_udf, PandasUDFType
df1 = spark.createDataFrame(
   [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
   ("time", "id", "v1"))

df2 = spark.createDataFrame(
   [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP)
   def asof_join(l, r):
      return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show()

```

        +--------+---+---+---+
        |    time| id| v1| v2|
        +--------+---+---+---+
        |20000101|  1|1.0|  x|
        |20000102|  1|3.0|  x|
        |20000101|  2|2.0|  y|
        |20000102|  2|4.0|  y|
        +--------+---+---+---+

### How was this patch tested?

Added unit test test_pandas_udf_cogrouped_map

Closes #24981 from d80tb7/SPARK-27463-poc-arrow-stream.

Authored-by: Chris Martin <chris@cmartinit.co.uk>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-09-17 17:13:50 -07:00
zhengruifeng 4d27a25908 [SPARK-22797][ML][PYTHON] Bucketizer support multi-column
### What changes were proposed in this pull request?
Bucketizer support multi-column in the python side

### Why are the changes needed?
Bucketizer should support multi-column like the scala side.

### Does this PR introduce any user-facing change?
yes, this PR add new Python API

### How was this patch tested?
added testsuites

Closes #25801 from zhengruifeng/20542_py.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
2019-09-17 11:52:20 +08:00
Huaxin Gao 77e9b58d4f [SPARK-28969][PYTHON][ML] OneVsRestParams parity between scala and python
### What changes were proposed in this pull request?
Follow the scala ```OneVsRestParams``` implementation, move ```setClassifier``` from ```OneVsRestParams``` to ```OneVsRest``` in Pyspark

### Why are the changes needed?
1. Maintain the parity between scala and python code.
2. ```Classifier``` can only be set in the estimator.

### Does this PR introduce any user-facing change?
Yes.
Previous behavior: ```OneVsRestModel``` has method ```setClassifier```
Current behavior:  ```setClassifier``` is removed from ```OneVsRestModel```. ```classifier``` can only be set in ```OneVsRest```.

### How was this patch tested?
Use existing tests

Closes #25715 from huaxingao/spark-28969.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-09-13 12:29:19 -05:00
Wenchen Fan 053dd858d3 [SPARK-28998][SQL] reorganize the packages of DS v2 interfaces/classes
### What changes were proposed in this pull request?

reorganize the packages of DS v2 interfaces/classes:
1. `org.spark.sql.connector.catalog`: put `TableCatalog`, `Table` and other related interfaces/classes
2. `org.spark.sql.connector.expression`: put `Expression`, `Transform` and other related interfaces/classes
3. `org.spark.sql.connector.read`: put `ScanBuilder`, `Scan` and other related interfaces/classes
4. `org.spark.sql.connector.write`: put `WriteBuilder`, `BatchWrite` and other related interfaces/classes

### Why are the changes needed?

Data Source V2 has evolved a lot. It's a bit weird that `Expression` is in `org.spark.sql.catalog.v2` and `Table` is in `org.spark.sql.sources.v2`.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

existing tests

Closes #25700 from cloud-fan/package.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-09-12 19:59:34 +08:00
dengziming 8f632d7045 [MINOR][DOCS] Fix few typos in the java docs
JIRA :https://issues.apache.org/jira/browse/SPARK-29050
'a hdfs' change into  'an hdfs'
'an unique' change into 'a unique'
'an url' change into 'a url'
'a error' change into 'an error'

Closes #25756 from dengziming/feature_fix_typos.

Authored-by: dengziming <dengziming@growingio.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-09-12 09:30:03 +09:00
HyukjinKwon 7ce0f2b499 [SPARK-29041][PYTHON] Allows createDataFrame to accept bytes as binary type
### What changes were proposed in this pull request?

This PR proposes to allow `bytes` as an acceptable type for binary type for `createDataFrame`.

### Why are the changes needed?

`bytes` is a standard type for binary in Python. This should be respected in PySpark side.

### Does this PR introduce any user-facing change?

Yes, _when specified type is binary_, we will allow `bytes` as a binary type. Previously this was not allowed in both Python 2 and Python 3 as below:

```python
spark.createDataFrame([[b"abcd"]], "col binary")
```

in Python 3

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/session.py", line 787, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/.../spark/python/pyspark/sql/session.py", line 442, in _createFromLocal
    data = list(data)
  File "/.../spark/python/pyspark/sql/session.py", line 769, in prepare
    verify_func(obj)
  File "/.../forked/spark/python/pyspark/sql/types.py", line 1403, in verify
    verify_value(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 1384, in verify_struct
    verifier(v)
  File "/.../spark/python/pyspark/sql/types.py", line 1403, in verify
    verify_value(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 1397, in verify_default
    verify_acceptable_types(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 1282, in verify_acceptable_types
    % (dataType, obj, type(obj))))
TypeError: field col: BinaryType can not accept object b'abcd' in type <class 'bytes'>
```

in Python 2:

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/session.py", line 787, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/.../spark/python/pyspark/sql/session.py", line 442, in _createFromLocal
    data = list(data)
  File "/.../spark/python/pyspark/sql/session.py", line 769, in prepare
    verify_func(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 1403, in verify
    verify_value(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 1384, in verify_struct
    verifier(v)
  File "/.../spark/python/pyspark/sql/types.py", line 1403, in verify
    verify_value(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 1397, in verify_default
    verify_acceptable_types(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 1282, in verify_acceptable_types
    % (dataType, obj, type(obj))))
TypeError: field col: BinaryType can not accept object 'abcd' in type <type 'str'>
```

So, it won't break anything.

### How was this patch tested?

Unittests were added and also manually tested as below.

```bash
./run-tests --python-executables=python2,python3 --testnames "pyspark.sql.tests.test_serde"
```

Closes #25749 from HyukjinKwon/SPARK-29041.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-09-12 08:52:25 +09:00
Sean Owen 6378d4bc06 [SPARK-28980][CORE][SQL][STREAMING][MLLIB] Remove most items deprecated in Spark 2.2.0 or earlier, for Spark 3
### What changes were proposed in this pull request?

- Remove SQLContext.createExternalTable and Catalog.createExternalTable, deprecated in favor of createTable since 2.2.0, plus tests of deprecated methods
- Remove HiveContext, deprecated in 2.0.0, in favor of `SparkSession.builder.enableHiveSupport`
- Remove deprecated KinesisUtils.createStream methods, plus tests of deprecated methods, deprecate in 2.2.0
- Remove deprecated MLlib (not Spark ML) linear method support, mostly utility constructors and 'train' methods, and associated docs. This includes methods in LinearRegression, LogisticRegression, Lasso, RidgeRegression. These have been deprecated since 2.0.0
- Remove deprecated Pyspark MLlib linear method support, including LogisticRegressionWithSGD, LinearRegressionWithSGD, LassoWithSGD
- Remove 'runs' argument in KMeans.train() method, which has been a no-op since 2.0.0
- Remove deprecated ChiSqSelector isSorted protected method
- Remove deprecated 'yarn-cluster' and 'yarn-client' master argument in favor of 'yarn' and deploy mode 'cluster', etc

Notes:

- I was not able to remove deprecated DataFrameReader.json(RDD) in favor of DataFrameReader.json(Dataset); the former was deprecated in 2.2.0, but, it is still needed to support Pyspark's .json() method, which can't use a Dataset.
- Looks like SQLContext.createExternalTable was not actually deprecated in Pyspark, but, almost certainly was meant to be? Catalog.createExternalTable was.
- I afterwards noted that the toDegrees, toRadians functions were almost removed fully in SPARK-25908, but Felix suggested keeping just the R version as they hadn't been technically deprecated. I'd like to revisit that. Do we really want the inconsistency? I'm not against reverting it again, but then that implies leaving SQLContext.createExternalTable just in Pyspark too, which seems weird.
- I *kept* LogisticRegressionWithSGD, LinearRegressionWithSGD, LassoWithSGD, RidgeRegressionWithSGD in Pyspark, though deprecated, as it is hard to remove them (still used by StreamingLogisticRegressionWithSGD?) and they are not fully removed in Scala. Maybe should not have been deprecated.

### Why are the changes needed?

Deprecated items are easiest to remove in a major release, so we should do so as much as possible for Spark 3. This does not target items deprecated 'recently' as of Spark 2.3, which is still 18 months old.

### Does this PR introduce any user-facing change?

Yes, in that deprecated items are removed from some public APIs.

### How was this patch tested?

Existing tests.

Closes #25684 from srowen/SPARK-28980.

Lead-authored-by: Sean Owen <sean.owen@databricks.com>
Co-authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-09-09 10:19:40 -05:00
zhengruifeng 4664a082c2 [SPARK-28968][ML] Add HasNumFeatures in the scala side
### What changes were proposed in this pull request?
Add HasNumFeatures in the scala side, with `1<<18` as the default value

### Why are the changes needed?
HasNumFeatures is already added in the py side, it is reasonable to keep them in sync.
I don't find other similar place.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Existing testsuites

Closes #25671 from zhengruifeng/add_HasNumFeatures.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
2019-09-06 11:50:45 +08:00
Sean Owen 36559b6525 [SPARK-28977][DOCS][SQL] Fix DataFrameReader.json docs to doc that partition column can be numeric, date or timestamp type
### What changes were proposed in this pull request?

`DataFrameReader.json()` accepts a partition column that is of numeric, date or timestamp type, according to the implementation in `JDBCRelation.scala`. Update the scaladoc accordingly, to match the documentation in `sql-data-sources-jdbc.md` too.

### Why are the changes needed?

scaladoc is incorrect.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

N/A

Closes #25687 from srowen/SPARK-28977.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-09-05 18:32:45 +09:00
Sean Owen eb037a8180 [SPARK-28855][CORE][ML][SQL][STREAMING] Remove outdated usages of Experimental, Evolving annotations
### What changes were proposed in this pull request?

The Experimental and Evolving annotations are both (like Unstable) used to express that a an API may change. However there are many things in the code that have been marked that way since even Spark 1.x. Per the dev thread, anything introduced at or before Spark 2.3.0 is pretty much 'stable' in that it would not change without a deprecation cycle. Therefore I'd like to remove most of these annotations. And, remove the `:: Experimental ::` scaladoc tag too. And likewise for Python, R.

The changes below can be summarized as:
- Generally, anything introduced at or before Spark 2.3.0 has been unmarked as neither Evolving nor Experimental
- Obviously experimental items like DSv2, Barrier mode, ExperimentalMethods are untouched
- I _did_ unmark a few MLlib classes introduced in 2.4, as I am quite confident they're not going to change (e.g. KolmogorovSmirnovTest, PowerIterationClustering)

It's a big change to review, so I'd suggest scanning the list of _files_ changed to see if any area seems like it should remain partly experimental and examine those.

### Why are the changes needed?

Many of these annotations are incorrect; the APIs are de facto stable. Leaving them also makes legitimate usages of the annotations less meaningful.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #25558 from srowen/SPARK-28855.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-09-01 10:15:00 -05:00
HyukjinKwon 8848af2635 [SPARK-28881][PYTHON][TESTS][FOLLOW-UP] Use SparkSession(SparkContext(...)) to prevent for Spark conf to affect other tests
### What changes were proposed in this pull request?

This PR proposes to match the test with branch-2.4. See https://github.com/apache/spark/pull/25593#discussion_r318109047

Seems using `SparkSession.builder` with Spark conf possibly affects other tests.

### Why are the changes needed?
To match with branch-2.4 and to make easier to backport.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Test was fixed.

Closes #25603 from HyukjinKwon/SPARK-28881-followup.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-28 10:39:21 +09:00
WeichenXu 7f605f5559 [SPARK-28621][SQL] Make spark.sql.crossJoin.enabled default value true
### What changes were proposed in this pull request?

Make `spark.sql.crossJoin.enabled` default value true

### Why are the changes needed?

For implicit cross join, we can set up a watchdog to cancel it if running for a long time.
When "spark.sql.crossJoin.enabled" is false, because `CheckCartesianProducts` is implemented in logical plan stage, it may generate some mismatching error which may confuse end user:
* it's done in logical phase, so we may fail queries that can be executed via broadcast join, which is very fast.
* if we move the check to the physical phase, then a query may success at the beginning, and begin to fail when the table size gets larger (other people insert data to the table). This can be quite confusing.
* the CROSS JOIN syntax doesn't work well if join reorder happens.
* some non-equi-join will generate plan using cartesian product, but `CheckCartesianProducts` do not detect it and raise error.

So that in order to address this in simpler way, we can turn off showing this cross-join error by default.

For reference, I list some cases raising mismatching error here:
Providing:
```
spark.range(2).createOrReplaceTempView("sm1") // can be broadcast
spark.range(50000000).createOrReplaceTempView("bg1") // cannot be broadcast
spark.range(60000000).createOrReplaceTempView("bg2") // cannot be broadcast
```
1) Some join could be convert to broadcast nested loop join, but CheckCartesianProducts raise error. e.g.
```
select sm1.id, bg1.id from bg1 join sm1 where sm1.id < bg1.id
```
2) Some join will run by CartesianJoin but CheckCartesianProducts DO NOT raise error. e.g.
```
select bg1.id, bg2.id from bg1 join bg2 where bg1.id < bg2.id
```

### Does this PR introduce any user-facing change?

### How was this patch tested?

Closes #25520 from WeichenXu123/SPARK-28621.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-27 21:53:37 +08:00
HyukjinKwon 00cb2f99cc [SPARK-28881][PYTHON][TESTS] Add a test to make sure toPandas with Arrow optimization throws an exception per maxResultSize
### What changes were proposed in this pull request?
This PR proposes to add a test case for:

```bash
./bin/pyspark --conf spark.driver.maxResultSize=1m
spark.conf.set("spark.sql.execution.arrow.enabled",True)
```

```python
spark.range(10000000).toPandas()
```

```
Empty DataFrame
Columns: [id]
Index: []
```

which can result in partial results (see https://github.com/apache/spark/pull/25593#issuecomment-525153808). This regression was found between Spark 2.3 and Spark 2.4, and accidentally fixed.

### Why are the changes needed?
To prevent the same regression in the future.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Test was added.

Closes #25594 from HyukjinKwon/SPARK-28881.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-27 17:30:06 +09:00
zhengruifeng 573b1cb835 [SPARK-28858][ML][PYSPARK] add tree-based transformation in the py side
### What changes were proposed in this pull request?
expose the newly added tree-based transformation in the py side

### Why are the changes needed?
function parity

### Does this PR introduce any user-facing change?
yes, add `setLeafCol` & `getLeafCol` in the py side

### How was this patch tested?
added tests & local tests

Closes #25566 from zhengruifeng/py_tree_path.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-08-23 15:18:35 -07:00
heleny fb1f868d4f [SPARK-28776][ML] SparkML Writer gets hadoop conf from session state
<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
-->

### What changes were proposed in this pull request?
SparkML writer gets hadoop conf from session state, instead of the spark context.
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->

### Why are the changes needed?
Allow for multiple sessions in the same context that have different hadoop configurations.
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->

### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->
No

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->
Tested in pyspark.ml.tests.test_persistence.PersistenceTest test_default_read_write

Closes #25505 from helenyugithub/SPARK-28776.

Authored-by: heleny <heleny@palantir.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-22 09:27:05 -05:00
darrentirto a787bc2884 [SPARK-28777][PYTHON][DOCS] Fix format_string doc string with the correct parameters
### What changes were proposed in this pull request?
The parameters doc string of the function format_string was changed from _col_, _d_ to _format_, _cols_ which is what the actual function declaration states

### Why are the changes needed?
The parameters stated by the documentation was inaccurate

### Does this PR introduce any user-facing change?
Yes.

**BEFORE**
![before](https://user-images.githubusercontent.com/9700541/63310013-e21a0e80-c2ad-11e9-806b-1d272c5cde12.png)

**AFTER**
![after](https://user-images.githubusercontent.com/9700541/63315812-6b870c00-c2c1-11e9-8165-82782628cd1a.png)

### How was this patch tested?
N/A: documentation only
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->

Closes #25506 from darrentirto/SPARK-28777.

Authored-by: darrentirto <darrentirto@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-08-19 20:44:46 -07:00
HyukjinKwon ef142371e7 [SPARK-28736][SPARK-28735][PYTHON][ML] Fix PySpark ML tests to pass in JDK 11
<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->

This PR proposes to fix both tests below:

```
======================================================================
FAIL: test_raw_and_probability_prediction (pyspark.ml.tests.test_algorithms.MultilayerPerceptronClassifierTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/dongjoon/APACHE/spark-master/python/pyspark/ml/tests/test_algorithms.py", line 89, in test_raw_and_probability_prediction
    self.assertTrue(np.allclose(result.rawPrediction, expected_rawPrediction, atol=1E-4))
AssertionError: False is not true
```

```
File "/Users/dongjoon/APACHE/spark-master/python/pyspark/mllib/clustering.py", line 386, in __main__.GaussianMixtureModel
Failed example:
    abs(softPredicted[0] - 1.0) < 0.001
Expected:
    True
Got:
    False
**********************************************************************
File "/Users/dongjoon/APACHE/spark-master/python/pyspark/mllib/clustering.py", line 388, in __main__.GaussianMixtureModel
Failed example:
    abs(softPredicted[1] - 0.0) < 0.001
Expected:
    True
Got:
    False
```

to pass in JDK 11.

The root cause seems to be different float values being understood via Py4J. This issue also was found in https://github.com/apache/spark/pull/25132 before.

When floats are transferred from Python to JVM, the values are sent as are. Python floats are not "precise" due to its own limitation - https://docs.python.org/3/tutorial/floatingpoint.html.
For some reasons, the floats from Python on JDK 8 and JDK 11 are different, which is already explicitly not guaranteed.

This seems why only some tests in PySpark with floats are being failed.

So, this PR fixes it by increasing tolerance in identified test cases in PySpark.

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->

To fully support JDK 11. See, for instance, https://github.com/apache/spark/pull/25443 and https://github.com/apache/spark/pull/25423 for ongoing efforts.

### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->

No.

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->

Manually tested as described in JIRAs:

```
$ build/sbt -Phadoop-3.2 test:package
$ python/run-tests --testnames 'pyspark.ml.tests.test_algorithms' --python-executables python
```

```
$ build/sbt -Phadoop-3.2 test:package
$ python/run-tests --testnames 'pyspark.mllib.clustering' --python-executables python
```

Closes #25475 from HyukjinKwon/SPARK-28735.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-16 19:47:29 +09:00
Huaxin Gao ba5ee27706 [SPARK-28243][PYSPARK][ML][FOLLOW-UP] Move Python DecisionTreeParams to regression.py
## What changes were proposed in this pull request?
Leave ```shared.py``` untouched. Move Python ```DecisionTreeParams``` to ```regression.py```

## How was this patch tested?
Use existing tests

Closes #25406 from huaxingao/spark-28243.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-15 10:21:26 -05:00
Liang-Chi Hsieh e6a0385289 [SPARK-28422][SQL][PYTHON] GROUPED_AGG pandas_udf should work without group by clause
## What changes were proposed in this pull request?

A GROUPED_AGG pandas python udf can't work, if without group by clause, like `select udf(id) from table`.

This doesn't match with aggregate function like sum, count..., and also dataset API like `df.agg(udf(df['id']))`.

When we parse a udf (or an aggregate function) like that from SQL syntax, it is known as a function in a project. `GlobalAggregates` rule in analysis makes such project as aggregate, by looking for aggregate expressions. At the moment, we should also look for GROUPED_AGG pandas python udf.

## How was this patch tested?

Added tests.

Closes #25352 from viirya/SPARK-28422.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-14 00:32:33 +09:00
Gengliang Wang 48adc91057 [SPARK-28698][SQL] Support user-specified output schema in to_avro
## What changes were proposed in this pull request?

The mapping of Spark schema to Avro schema is many-to-many. (See https://spark.apache.org/docs/latest/sql-data-sources-avro.html#supported-types-for-spark-sql---avro-conversion)
The default schema mapping might not be exactly what users want. For example, by default, a "string" column is always written as "string" Avro type, but users might want to output the column as "enum" Avro type.
With PR https://github.com/apache/spark/pull/21847, Spark supports user-specified schema in the batch writer.
For the function `to_avro`, we should support user-specified output schema as well.

## How was this patch tested?

Unit test.

Closes #25419 from gengliangwang/to_avro.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-13 20:52:16 +08:00
wuyi cbad616d4c [SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone
## What changes were proposed in this pull request?

In this PR, we implements a complete process of GPU-aware resources scheduling
in Standalone. The whole process looks like: Worker sets up isolated resources
when it starts up and registers to master along with its resources. And, Master
picks up usable workers according to driver/executor's resource requirements to
launch driver/executor on them. Then, Worker launches the driver/executor after
preparing resources file, which is created under driver/executor's working directory,
with specified resource addresses(told by master). When driver/executor finished,
their resources could be recycled to worker. Finally, if a worker stops, it
should always release its resources firstly.

For the case of Workers and Drivers in **client** mode run on the same host, we introduce
a config option named `spark.resources.coordinate.enable`(default true) to indicate
whether Spark should coordinate resources for user. If `spark.resources.coordinate.enable=false`, user should be responsible for configuring different resources for Workers and Drivers when use resourcesFile or discovery script. If true, Spark would help user to assign different  resources for Workers and Drivers.

The solution for Spark to coordinate resources among Workers and Drivers is:

Generally, use a shared file named *____allocated_resources____.json* to sync allocated
resources info among Workers and Drivers on the same host.

After a Worker or Driver found all resources using the configured resourcesFile and/or
discovery script during launching, it should filter out available resources by excluding resources already allocated in *____allocated_resources____.json* and acquire resources from available resources according to its own requirement. After that, it should write its allocated resources along with its process id (pid) into *____allocated_resources____.json*.  Pid (proposed by tgravescs) here used to check whether the allocated resources are still valid in case of Worker or Driver crashes and doesn't release resources properly. And when a Worker or Driver finished, normally, it would always clean up its own allocated resources in *____allocated_resources____.json*.

Note that we'll always get a file lock before any access to file *____allocated_resources____.json*
and release the lock finally.

Futhermore, we appended resources info in `WorkerSchedulerStateResponse` to work
around master change behaviour in HA mode.

## How was this patch tested?

Added unit tests in WorkerSuite, MasterSuite, SparkContextSuite.

Manually tested with client/cluster mode (e.g. multiple workers) in a single node Standalone.

Closes #25047 from Ngone51/SPARK-27371.

Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
2019-08-09 07:49:03 -05:00
Shixiong Zhu 5bb69945e4 [SPARK-28651][SS] Force the schema of Streaming file source to be nullable
## What changes were proposed in this pull request?

Right now, batch DataFrame always changes the schema to nullable automatically (See this line: 325bc8e9c6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala (L399)). But streaming file source is missing this.

This PR updates the streaming file source schema to force it be nullable. I also added a flag `spark.sql.streaming.fileSource.schema.forceNullable` to disable this change since some users may rely on the old behavior.

## How was this patch tested?

The new unit test.

Closes #25382 from zsxwing/SPARK-28651.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-09 18:54:55 +09:00
Anton Yanchenko bda5b51576 [SPARK-28454][PYTHON] Validate LongType in createDataFrame(verifySchema=True)
## What changes were proposed in this pull request?

Add missing validation for `LongType` in `pyspark.sql.types._make_type_verifier`.

## How was this patch tested?

Doctests / unittests / manual tests.

Unpatched version:
```
In [23]: s.createDataFrame([{'x': 1 << 64}], StructType([StructField('x', LongType())])).collect()
Out[23]: [Row(x=None)]
```

Patched:
```
In [5]: s.createDataFrame([{'x': 1 << 64}], StructType([StructField('x', LongType())])).collect()
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-5-c1740fcadbf9> in <module>
----> 1 s.createDataFrame([{'x': 1 << 64}], StructType([StructField('x', LongType())])).collect()

/usr/local/lib/python3.5/site-packages/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    689             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    690         else:
--> 691             rdd, schema = self._createFromLocal(map(prepare, data), schema)
    692         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    693         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/usr/local/lib/python3.5/site-packages/pyspark/sql/session.py in _createFromLocal(self, data, schema)
    405         # make sure data could consumed multiple times
    406         if not isinstance(data, list):
--> 407             data = list(data)
    408
    409         if schema is None or isinstance(schema, (list, tuple)):

/usr/local/lib/python3.5/site-packages/pyspark/sql/session.py in prepare(obj)
    671
    672             def prepare(obj):
--> 673                 verify_func(obj)
    674                 return obj
    675         elif isinstance(schema, DataType):

/usr/local/lib/python3.5/site-packages/pyspark/sql/types.py in verify(obj)
   1427     def verify(obj):
   1428         if not verify_nullability(obj):
-> 1429             verify_value(obj)
   1430
   1431     return verify

/usr/local/lib/python3.5/site-packages/pyspark/sql/types.py in verify_struct(obj)
   1397             if isinstance(obj, dict):
   1398                 for f, verifier in verifiers:
-> 1399                     verifier(obj.get(f))
   1400             elif isinstance(obj, Row) and getattr(obj, "__from_dict__", False):
   1401                 # the order in obj could be different than dataType.fields

/usr/local/lib/python3.5/site-packages/pyspark/sql/types.py in verify(obj)
   1427     def verify(obj):
   1428         if not verify_nullability(obj):
-> 1429             verify_value(obj)
   1430
   1431     return verify

/usr/local/lib/python3.5/site-packages/pyspark/sql/types.py in verify_long(obj)
   1356             if obj < -9223372036854775808 or obj > 9223372036854775807:
   1357                 raise ValueError(
-> 1358                     new_msg("object of LongType out of range, got: %s" % obj))
   1359
   1360         verify_value = verify_long

ValueError: field x: object of LongType out of range, got: 18446744073709551616
```

Closes #25117 from simplylizz/master.

Authored-by: Anton Yanchenko <simplylizz@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-08 11:47:25 +09:00
wuyi 94499af6f0 [SPARK-28486][CORE][PYTHON] Map PythonBroadcast's data file to a BroadcastBlock to avoid delete by GC
## What changes were proposed in this pull request?

Currently, PythonBroadcast may delete its data file while a python worker still needs it. This happens because PythonBroadcast overrides the `finalize()` method to delete its data file. So, when GC happens and no  references on broadcast variable, it may trigger `finalize()` to delete
data file. That's also means, data under python Broadcast variable couldn't be deleted when `unpersist()`/`destroy()` called but relys on GC.

In this PR, we removed the `finalize()` method, and map the PythonBroadcast data file to a BroadcastBlock(which has the same broadcast id with the broadcast variable who wrapped this PythonBroadcast) when PythonBroadcast is deserializing. As a result, the data file could be deleted just like other pieces of the Broadcast variable when `unpersist()`/`destroy()` called and do not rely on GC any more.

## How was this patch tested?

Added a Python test, and tested manually(verified create/delete the broadcast block).

Closes #25262 from Ngone51/SPARK-28486.

Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-05 20:18:53 +09:00
WeichenXu b3394db193 [SPARK-28582][PYTHON] Fix flaky test DaemonTests.do_termination_test which fail on Python 3.7
## What changes were proposed in this pull request?

This PR picks up https://github.com/apache/spark/pull/25315 back after removing `Popen.wait` usage which exists in Python 3 only. I saw the last test results wrongly and thought it was passed.

Fix flaky test DaemonTests.do_termination_test which fail on Python 3.7. I add a sleep after the test connection to daemon.

## How was this patch tested?

Run test
```
python/run-tests --python-executables=python3.7 --testname "pyspark.tests.test_daemon DaemonTests"
```
**Before**
Fail on test "test_termination_sigterm". And we can see daemon process do not exit.
**After**
Test passed

Closes #25343 from HyukjinKwon/SPARK-28582.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-03 10:31:15 +09:00
Dongjoon Hyun 8ae032d78d Revert "[SPARK-28582][PYSPARK] Fix flaky test DaemonTests.do_termination_test which fail on Python 3.7"
This reverts commit fbeee0c5bc.
2019-08-02 10:14:20 -07:00
Huaxin Gao 660423d717 [SPARK-23469][ML] HashingTF should use corrected MurmurHash3 implementation
## What changes were proposed in this pull request?

Update HashingTF to use new implementation of MurmurHash3
Make HashingTF use the old MurmurHash3 when a model from pre 3.0 is loaded

## How was this patch tested?

Change existing unit tests. Also add one unit test to make sure HashingTF use the old MurmurHash3 when a model from pre 3.0 is loaded

Closes #25303 from huaxingao/spark-23469.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-08-02 10:53:36 -05:00
WeichenXu fbeee0c5bc [SPARK-28582][PYSPARK] Fix flaky test DaemonTests.do_termination_test which fail on Python 3.7
## What changes were proposed in this pull request?

Fix flaky test DaemonTests.do_termination_test which fail on Python 3.7. I add a sleep after the test connection to daemon.

## How was this patch tested?

Run test
```
python/run-tests --python-executables=python3.7 --testname "pyspark.tests.test_daemon DaemonTests"
```
**Before**
Fail on test "test_termination_sigterm". And we can see daemon process do not exit.
**After**
Test passed

Closes #25315 from WeichenXu123/fix_py37_daemon.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-02 22:07:06 +09:00
HyukjinKwon b8e13b0aea [SPARK-28153][PYTHON] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF)
## What changes were proposed in this pull request?

This PR proposes to use `AtomicReference` so that parent and child threads can access to the same file block holder.

Python UDF expressions are turned to a plan and then it launches a separate thread to consume the input iterator. In the separate child thread, the iterator sets `InputFileBlockHolder.set` before the parent does which the parent thread is unable to read later.

1. In this separate child thread, if it happens to call `InputFileBlockHolder.set` first without initialization of the parent's thread local (which is done when the `ThreadLocal.get()` is first called), the child thread seems calling its own `initialValue` to initialize.

2. After that, the parent calls its own `initialValue` to initializes at the first call of `ThreadLocal.get()`.

3. Both now have two different references. Updating at child isn't reflected to parent.

This PR fixes it via initializing parent's thread local with `AtomicReference` for file status so that they can be used in each task, and children thread's update is reflected.

I also tried to explain this a bit more at https://github.com/apache/spark/pull/24958#discussion_r297203041.

## How was this patch tested?

Manually tested and unittest was added.

Closes #24958 from HyukjinKwon/SPARK-28153.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-07-31 22:40:01 +08:00
WeichenXu a745381b9d [SPARK-25382][SQL][PYSPARK] Remove ImageSchema.readImages in 3.0
## What changes were proposed in this pull request?

I remove the deprecate `ImageSchema.readImages`.
Move some useful methods from class `ImageSchema` into class `ImageFileFormat`.

In pyspark, I rename `ImageSchema` class to be `ImageUtils`, and keep some useful python methods in it.

## How was this patch tested?

UT.

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #25245 from WeichenXu123/remove_image_schema.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-31 14:26:18 +09:00
WeichenXu 3b14088541 [SPARK-26175][PYTHON] Redirect the standard input of the forked child to devnull in daemon
## What changes were proposed in this pull request?

PySpark worker daemon reads from stdin the worker PIDs to kill. 1bb60ab839/python/pyspark/daemon.py (L127)

However, the worker process is a forked process from the worker daemon process and we didn't close stdin on the child after fork. This means the child and user program can read stdin as well, which blocks daemon from receiving the PID to kill. This can cause issues because the task reaper might detect the task was not terminated and eventually kill the JVM.

This PR fix this by redirecting the standard input of the forked child to devnull.

## How was this patch tested?

Manually test.

In `pyspark`, run:
```
import subprocess
def task(_):
  subprocess.check_output(["cat"])

sc.parallelize(range(1), 1).mapPartitions(task).count()
```

Before:
The job will get stuck and press Ctrl+C to exit the job but the python worker process do not exit.
After:
The job finish correctly. The "cat" print nothing (because the dummay stdin is "/dev/null").
The python worker process exit normally.

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #25138 from WeichenXu123/SPARK-26175.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-31 09:10:24 +09:00
zhengruifeng 44c28d7515 [SPARK-28399][ML][PYTHON] implement RobustScaler
## What changes were proposed in this pull request?
Implement `RobustScaler`
Since the transformation is quite similar to `StandardScaler`, I refactor the transform function so that it can be reused in both scalers.

## How was this patch tested?
existing and added tests

Closes #25160 from zhengruifeng/robust_scaler.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-07-30 10:24:33 -05:00
Maxim Gekk a5a5da78cf [SPARK-28471][SQL] Replace yyyy by uuuu in date-timestamp patterns without era
## What changes were proposed in this pull request?

In the PR, I propose to use `uuuu` for years instead of `yyyy` in date/timestamp patterns without the era pattern `G` (https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). **Parsing/formatting of positive years (current era) will be the same.** The difference is in formatting negative years belong to previous era - BC (Before Christ).

I replaced the `yyyy` pattern by `uuuu` everywhere except:
1. Test, Suite & Benchmark. Existing tests must work as is.
2. `SimpleDateFormat` because it doesn't support the `uuuu` pattern.
3. Comments and examples (except comments related to already replaced patterns).

Before the changes, the year of common era `100` and the year of BC era `-99`, showed similarly as `100`.  After the changes negative years will be formatted with the `-` sign.

Before:
```Scala
scala> Seq(java.time.LocalDate.of(-99, 1, 1)).toDF().show
+----------+
|     value|
+----------+
|0100-01-01|
+----------+
```

After:
```Scala
scala> Seq(java.time.LocalDate.of(-99, 1, 1)).toDF().show
+-----------+
|      value|
+-----------+
|-0099-01-01|
+-----------+
```

## How was this patch tested?

By existing test suites, and added tests for negative years to `DateFormatterSuite` and `TimestampFormatterSuite`.

Closes #25230 from MaxGekk/year-pattern-uuuu.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-28 20:36:36 -07:00
Huaxin Gao 70f82fd298 [SPARK-21481][ML] Add indexOf method in ml.feature.HashingTF
## What changes were proposed in this pull request?

Add indexOf method for ml.feature.HashingTF.

## How was this patch tested?

Add Unit test.

Closes #25250 from huaxingao/spark-21481.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-07-28 08:32:43 -05:00
wangguangxin.cn fbaa177d2a [MINOR][PYTHON] Use _memory_limit to get worker memory conf in rdd.py
## What changes were proposed in this pull request?

Replace duplicate code by function `_memory_limit`

## How was this patch tested?

Existing UTs

Closes #25273 from WangGuangxin/python_memory_limit.

Authored-by: wangguangxin.cn <wangguangxin.cn@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-27 11:58:50 -07:00
Huaxin Gao 3de4e1b9b4 [SPARK-28507][ML][PYSPARK] Remove deprecated API context(self, sqlContext) from pyspark/ml/util.py
## What changes were proposed in this pull request?

remove deprecated ```  def context(self, sqlContext)``` from ```pyspark/ml/util.py```

## How was this patch tested?
test with existing ml PySpark test suites

Closes #25246 from huaxingao/spark-28507.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-07-26 12:12:11 -05:00
Huaxin Gao 72c80ee81c [SPARK-28243][PYSPARK][ML] Remove setFeatureSubsetStrategy and setSubsamplingRate from Python TreeEnsembleParams
## What changes were proposed in this pull request?

Remove deprecated setFeatureSubsetStrategy and setSubsamplingRate from Python TreeEnsembleParams

## How was this patch tested?

Use existing tests.

Closes #25046 from huaxingao/spark-28243.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-07-20 10:44:33 -05:00
zero323 a0c2fa63ab [SPARK-28439][PYTHON][SQL] Add support for count: Column in array_repeat
## What changes were proposed in this pull request?

This adds simple check for `count` argument:

- If it is a `Column` we apply `_to_java_column` before invoking JVM counterpart
- Otherwise we proceed as before.

## How was this patch tested?

Manual testing.

Closes #25193 from zero323/SPARK-28278.

Authored-by: zero323 <mszymkiewicz@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-07-18 12:58:48 -07:00
Huaxin Gao 971e832e0e [SPARK-28411][PYTHON][SQL] InsertInto with overwrite is not honored
## What changes were proposed in this pull request?
In the following python code
```
df.write.mode("overwrite").insertInto("table")
```
```insertInto``` ignores ```mode("overwrite")```  and appends by default.

## How was this patch tested?

Add Unit test.

Closes #25175 from huaxingao/spark-28411.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-18 13:37:59 +09:00
Maxim Gekk 70073b19eb [SPARK-27609][PYTHON] Convert values of function options to strings
## What changes were proposed in this pull request?

In the PR, I propose to convert options values to strings by using `to_str()` for the following functions:  `from_csv()`, `to_csv()`, `from_json()`, `to_json()`, `schema_of_csv()` and `schema_of_json()`. This will make handling of function options consistent to option handling in `DataFrameReader`/`DataFrameWriter`.

For example:
```Python
df.select(from_csv(df.value, "s string", {'ignoreLeadingWhiteSpace': True})
```

## How was this patch tested?

Added an example for `from_csv()` which was tested by:
```Shell
./python/run-tests --testnames pyspark.sql.functions
```

Closes #25182 from MaxGekk/options_to_str.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-18 13:37:03 +09:00
HyukjinKwon 66179fa842 [SPARK-28418][PYTHON][SQL] Wait for event process in 'test_query_execution_listener_on_collect'
## What changes were proposed in this pull request?

It fixes a flaky test:

```
ERROR [0.164s]: test_query_execution_listener_on_collect (pyspark.sql.tests.test_dataframe.QueryExecutionListenerTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/jenkins/python/pyspark/sql/tests/test_dataframe.py", line 758, in test_query_execution_listener_on_collect
    "The callback from the query execution listener should be called after 'collect'")
AssertionError: The callback from the query execution listener should be called after 'collect'
```

Seems it can be failed because the event was somehow delayed but checked first.

## How was this patch tested?

Manually.

Closes #25177 from HyukjinKwon/SPARK-28418.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-17 18:44:11 +09:00
Liang-Chi Hsieh 591de42351 [SPARK-28381][PYSPARK] Upgraded version of Pyrolite to 4.30
## What changes were proposed in this pull request?

This upgraded to a newer version of Pyrolite. Most updates [1] in the newer version are for dotnot. For java, it includes a bug fix to Unpickler regarding cleaning up Unpickler memo, and support of protocol 5.

After upgrading, we can remove the fix at SPARK-27629 for the bug in Unpickler.

[1] https://github.com/irmen/Pyrolite/compare/pyrolite-4.23...master

## How was this patch tested?

Manually tested on Python 3.6 in local on existing tests.

Closes #25143 from viirya/upgrade-pyrolite.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-15 12:29:58 +09:00
Liang-Chi Hsieh 707411f479 [SPARK-28378][PYTHON] Remove usage of cgi.escape
## What changes were proposed in this pull request?

`cgi.escape` is deprecated [1], and removed at 3.8 [2]. We better to replace it.

[1] https://docs.python.org/3/library/cgi.html#cgi.escape.
[2] https://docs.python.org/3.8/whatsnew/3.8.html#api-and-feature-removals

## How was this patch tested?

Existing tests.

Closes #25142 from viirya/remove-cgi-escape.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-14 15:26:00 +09:00
Jesse Cai 79e2047703 [SPARK-28355][CORE][PYTHON] Use Spark conf for threshold at which command is compressed by broadcast
## What changes were proposed in this pull request?

The `_prepare_for_python_RDD` method currently broadcasts a pickled command if its length is greater than the hardcoded value `1 << 20` (1M). This change sets this value as a Spark conf instead.

## How was this patch tested?

Unit tests, manual tests.

Closes #25123 from jessecai/SPARK-28355.

Authored-by: Jesse Cai <jesse.cai@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-07-13 08:44:16 -07:00
Thomas Graves f84cca2d84 [SPARK-28234][CORE][PYTHON] Add python and JavaSparkContext support to get resources
## What changes were proposed in this pull request?

Add python api support and JavaSparkContext support for resources().  I needed the JavaSparkContext support for it to properly translate into python with the py4j stuff.

## How was this patch tested?

Unit tests added and manually tested in local cluster mode and on yarn.

Closes #25087 from tgravescs/SPARK-28234-python.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-11 09:32:58 +09:00
Liang-Chi Hsieh 7858e534d3 [SPARK-28323][SQL][PYTHON] PythonUDF should be able to use in join condition
## What changes were proposed in this pull request?

There is a bug in `ExtractPythonUDFs` that produces wrong result attributes. It causes a failure when using `PythonUDF`s among multiple child plans, e.g., join. An example is using `PythonUDF`s in join condition.

```python
>>> left = spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)])
>>> right = spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)])
>>> f = udf(lambda a: a, IntegerType())
>>> df = left.join(right, [f("a") == f("b"), left.a1 == right.b1])
>>> df.collect()
19/07/10 12:20:49 ERROR Executor: Exception in task 5.0 in stage 0.0 (TID 5)
java.lang.ArrayIndexOutOfBoundsException: 1
        at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:201)
        at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35)
        at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt(rows.scala:36)
        at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt$(rows.scala:36)
        at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.isNullAt(rows.scala:195)
        at org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:70)
        ...
```

## How was this patch tested?

Added test.

Closes #25091 from viirya/SPARK-28323.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-07-10 16:29:58 -07:00
Henry D a32c92c0cd [SPARK-28140][MLLIB][PYTHON] Accept DataFrames in RowMatrix and IndexedRowMatrix constructors
## What changes were proposed in this pull request?

In both cases, the input `DataFrame` schema must contain only the information that's required for the matrix object, so a vector column in the case of `RowMatrix` and long and vector columns for `IndexedRowMatrix`.

## How was this patch tested?

Unit tests that verify:
- `RowMatrix` and `IndexedRowMatrix` can be created from `DataFrame`s
- If the schema does not match expectations, we throw an `IllegalArgumentException`

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #24953 from henrydavidge/row-matrix-df.

Authored-by: Henry D <henrydavidge@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-07-09 16:39:21 -05:00
HyukjinKwon fe3e34dda6 [SPARK-28273][SQL][PYTHON] Convert and port 'pgSQL/case.sql' into UDF test base
## What changes were proposed in this pull request?

This PR adds some tests converted from `pgSQL/case.sql'` to test UDFs. Please see contribution guide of this umbrella ticket - [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921).

This PR also contains two minor fixes:

1. Change name of Scala UDF from `UDF:name(...)` to `name(...)` to be consistent with Python'

2. Fix Scala UDF at `IntegratedUDFTestUtils.scala ` to handle `null` in strings.

<details><summary>Diff comparing to 'pgSQL/case.sql'</summary>
<p>

```diff
diff --git a/sql/core/src/test/resources/sql-tests/results/pgSQL/case.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-case.sql.out
index fa078d16d6d..55bef64338f 100644
--- a/sql/core/src/test/resources/sql-tests/results/pgSQL/case.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-case.sql.out
 -115,7 +115,7  struct<>
 -- !query 13
 SELECT '3' AS `One`,
   CASE
-    WHEN 1 < 2 THEN 3
+    WHEN CAST(udf(1 < 2) AS boolean) THEN 3
   END AS `Simple WHEN`
 -- !query 13 schema
 struct<One:string,Simple WHEN:int>
 -126,10 +126,10  struct<One:string,Simple WHEN:int>
 -- !query 14
 SELECT '<NULL>' AS `One`,
   CASE
-    WHEN 1 > 2 THEN 3
+    WHEN 1 > 2 THEN udf(3)
   END AS `Simple default`
 -- !query 14 schema
-struct<One:string,Simple default:int>
+struct<One:string,Simple default:string>
 -- !query 14 output
 <NULL> NULL

 -137,17 +137,17  struct<One:string,Simple default:int>
 -- !query 15
 SELECT '3' AS `One`,
   CASE
-    WHEN 1 < 2 THEN 3
-    ELSE 4
+    WHEN udf(1) < 2 THEN udf(3)
+    ELSE udf(4)
   END AS `Simple ELSE`
 -- !query 15 schema
-struct<One:string,Simple ELSE:int>
+struct<One:string,Simple ELSE:string>
 -- !query 15 output
 3      3

 -- !query 16
-SELECT '4' AS `One`,
+SELECT udf('4') AS `One`,
   CASE
     WHEN 1 > 2 THEN 3
     ELSE 4
 -159,10 +159,10  struct<One:string,ELSE default:int>

 -- !query 17
-SELECT '6' AS `One`,
+SELECT udf('6') AS `One`,
   CASE
-    WHEN 1 > 2 THEN 3
-    WHEN 4 < 5 THEN 6
+    WHEN CAST(udf(1 > 2) AS boolean) THEN 3
+    WHEN udf(4) < 5 THEN 6
     ELSE 7
   END AS `Two WHEN with default`
 -- !query 17 schema
 -173,7 +173,7  struct<One:string,Two WHEN with default:int>

 -- !query 18
 SELECT '7' AS `None`,
-  CASE WHEN rand() < 0 THEN 1
+  CASE WHEN rand() < udf(0) THEN 1
   END AS `NULL on no matches`
 -- !query 18 schema
 struct<None:string,NULL on no matches:int>
 -182,36 +182,36  struct<None:string,NULL on no matches:int>

 -- !query 19
-SELECT CASE WHEN 1=0 THEN 1/0 WHEN 1=1 THEN 1 ELSE 2/0 END
+SELECT CASE WHEN CAST(udf(1=0) AS boolean) THEN 1/0 WHEN 1=1 THEN 1 ELSE 2/0 END
 -- !query 19 schema
-struct<CASE WHEN (1 = 0) THEN (CAST(1 AS DOUBLE) / CAST(0 AS DOUBLE)) WHEN (1 = 1) THEN CAST(1 AS DOUBLE) ELSE (CAST(2 AS DOUBLE) / CAST(0 AS DOUBLE)) END:double>
+struct<CASE WHEN CAST(udf((1 = 0)) AS BOOLEAN) THEN (CAST(1 AS DOUBLE) / CAST(0 AS DOUBLE)) WHEN (1 = 1) THEN CAST(1 AS DOUBLE) ELSE (CAST(2 AS DOUBLE) / CAST(0 AS DOUBLE)) END:double>
 -- !query 19 output
 1.0

 -- !query 20
-SELECT CASE 1 WHEN 0 THEN 1/0 WHEN 1 THEN 1 ELSE 2/0 END
+SELECT CASE 1 WHEN 0 THEN 1/udf(0) WHEN 1 THEN 1 ELSE 2/0 END
 -- !query 20 schema
-struct<CASE WHEN (1 = 0) THEN (CAST(1 AS DOUBLE) / CAST(0 AS DOUBLE)) WHEN (1 = 1) THEN CAST(1 AS DOUBLE) ELSE (CAST(2 AS DOUBLE) / CAST(0 AS DOUBLE)) END:double>
+struct<CASE WHEN (1 = 0) THEN (CAST(1 AS DOUBLE) / CAST(CAST(udf(0) AS DOUBLE) AS DOUBLE)) WHEN (1 = 1) THEN CAST(1 AS DOUBLE) ELSE (CAST(2 AS DOUBLE) / CAST(0 AS DOUBLE)) END:double>
 -- !query 20 output
 1.0

 -- !query 21
-SELECT CASE WHEN i > 100 THEN 1/0 ELSE 0 END FROM case_tbl
+SELECT CASE WHEN i > 100 THEN udf(1/0) ELSE udf(0) END FROM case_tbl
 -- !query 21 schema
-struct<CASE WHEN (i > 100) THEN (CAST(1 AS DOUBLE) / CAST(0 AS DOUBLE)) ELSE CAST(0 AS DOUBLE) END:double>
+struct<CASE WHEN (i > 100) THEN udf((cast(1 as double) / cast(0 as double))) ELSE udf(0) END:string>
 -- !query 21 output
-0.0
-0.0
-0.0
-0.0
+0
+0
+0
+0

 -- !query 22
-SELECT CASE 'a' WHEN 'a' THEN 1 ELSE 2 END
+SELECT CASE 'a' WHEN 'a' THEN udf(1) ELSE udf(2) END
 -- !query 22 schema
-struct<CASE WHEN (a = a) THEN 1 ELSE 2 END:int>
+struct<CASE WHEN (a = a) THEN udf(1) ELSE udf(2) END:string>
 -- !query 22 output
 1

 -283,7 +283,7  big

 -- !query 27
-SELECT * FROM CASE_TBL WHERE COALESCE(f,i) = 4
+SELECT * FROM CASE_TBL WHERE udf(COALESCE(f,i)) = 4
 -- !query 27 schema
 struct<i:int,f:double>
 -- !query 27 output
 -291,7 +291,7  struct<i:int,f:double>

 -- !query 28
-SELECT * FROM CASE_TBL WHERE NULLIF(f,i) = 2
+SELECT * FROM CASE_TBL WHERE udf(NULLIF(f,i)) = 2
 -- !query 28 schema
 struct<i:int,f:double>
 -- !query 28 output
 -299,10 +299,10  struct<i:int,f:double>

 -- !query 29
-SELECT COALESCE(a.f, b.i, b.j)
+SELECT udf(COALESCE(a.f, b.i, b.j))
   FROM CASE_TBL a, CASE2_TBL b
 -- !query 29 schema
-struct<coalesce(f, CAST(i AS DOUBLE), CAST(j AS DOUBLE)):double>
+struct<udf(coalesce(f, cast(i as double), cast(j as double))):string>
 -- !query 29 output
 -30.3
 -30.3
 -332,8 +332,8  struct<coalesce(f, CAST(i AS DOUBLE), CAST(j AS DOUBLE)):double>

 -- !query 30
 SELECT *
-  FROM CASE_TBL a, CASE2_TBL b
-  WHERE COALESCE(a.f, b.i, b.j) = 2
+   FROM CASE_TBL a, CASE2_TBL b
+   WHERE udf(COALESCE(a.f, b.i, b.j)) = 2
 -- !query 30 schema
 struct<i:int,f:double,i:int,j:int>
 -- !query 30 output
 -342,7 +342,7  struct<i:int,f:double,i:int,j:int>

 -- !query 31
-SELECT '' AS Five, NULLIF(a.i,b.i) AS `NULLIF(a.i,b.i)`,
+SELECT udf('') AS Five, NULLIF(a.i,b.i) AS `NULLIF(a.i,b.i)`,
   NULLIF(b.i, 4) AS `NULLIF(b.i,4)`
   FROM CASE_TBL a, CASE2_TBL b
 -- !query 31 schema
 -377,7 +377,7  struct<Five:string,NULLIF(a.i,b.i):int,NULLIF(b.i,4):int>
 -- !query 32
 SELECT '' AS `Two`, *
   FROM CASE_TBL a, CASE2_TBL b
-  WHERE COALESCE(f,b.i) = 2
+  WHERE CAST(udf(COALESCE(f,b.i) = 2) AS boolean)
 -- !query 32 schema
 struct<Two:string,i:int,f:double,i:int,j:int>
 -- !query 32 output
 -388,15 +388,15  struct<Two:string,i:int,f:double,i:int,j:int>
 -- !query 33
 SELECT CASE
   (CASE vol('bar')
-    WHEN 'foo' THEN 'it was foo!'
-    WHEN vol(null) THEN 'null input'
+    WHEN udf('foo') THEN 'it was foo!'
+    WHEN udf(vol(null)) THEN 'null input'
     WHEN 'bar' THEN 'it was bar!' END
   )
-  WHEN 'it was foo!' THEN 'foo recognized'
-  WHEN 'it was bar!' THEN 'bar recognized'
-  ELSE 'unrecognized' END
+  WHEN udf('it was foo!') THEN 'foo recognized'
+  WHEN 'it was bar!' THEN udf('bar recognized')
+  ELSE 'unrecognized' END AS col
 -- !query 33 schema
-struct<CASE WHEN (CASE WHEN (UDF:vol(bar) = foo) THEN it was foo! WHEN (UDF:vol(bar) = UDF:vol(null)) THEN null input WHEN (UDF:vol(bar) = bar) THEN it was bar! END = it was foo!) THEN foo recognized WHEN (CASE WHEN (UDF:vol(bar) = foo) THEN it was foo! WHEN (UDF:vol(bar) = UDF:vol(null)) THEN null input WHEN (UDF:vol(bar) = bar) THEN it was bar! END = it was bar!) THEN bar recognized ELSE unrecognized END:string>
+struct<col:string>
 -- !query 33 output
 bar recognized
```

</p>
</details>

https://github.com/apache/spark/pull/25069 contains the same minor fixes as it's required to write the tests.

## How was this patch tested?

Tested as guided in [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921).

Closes #25070 from HyukjinKwon/SPARK-28273.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-07-09 10:50:07 +08:00
HyukjinKwon cdbc30213b [SPARK-28226][PYTHON] Document Pandas UDF mapInPandas
## What changes were proposed in this pull request?

This PR proposes to document `MAP_ITER` with `mapInPandas`.

## How was this patch tested?

Manually checked the documentation.

![Screen Shot 2019-07-05 at 1 52 30 PM](https://user-images.githubusercontent.com/6477701/60698812-26cf2d80-9f2c-11e9-8295-9c00c28f5569.png)

![Screen Shot 2019-07-05 at 1 48 53 PM](https://user-images.githubusercontent.com/6477701/60698710-ac061280-9f2b-11e9-8521-a4f361207e06.png)

Closes #25025 from HyukjinKwon/SPARK-28226.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-07 09:07:52 +09:00
HyukjinKwon fe75ff8bea [SPARK-28206][PYTHON] Remove the legacy Epydoc in PySpark API documentation
## What changes were proposed in this pull request?

Seems like we used to generate PySpark API documentation by Epydoc almost at the very first place (see 85b8f2c64f).

This fixes an actual issue:

Before:

![Screen Shot 2019-07-05 at 8 20 01 PM](https://user-images.githubusercontent.com/6477701/60720491-e9879180-9f65-11e9-9562-100830a456cd.png)

After:

![Screen Shot 2019-07-05 at 8 20 05 PM](https://user-images.githubusercontent.com/6477701/60720495-ec828200-9f65-11e9-8277-8f689e292cb0.png)

It seems apparently a bug within `epytext` plugin during the conversion between`param` and `:param` syntax. See also [Epydoc syntax](http://epydoc.sourceforge.net/manual-epytext.html).

Actually, Epydoc syntax violates [PEP-257](https://www.python.org/dev/peps/pep-0257/) IIRC and blocks us to enable some rules for doctest linter as well.

We should remove this legacy away and I guess Spark 3 is good timing to do it.

## How was this patch tested?

Manually built the doc and check each.

I had to manually find the Epydoc syntax by `git grep -r "{L"`, for instance.

Closes #25060 from HyukjinKwon/SPARK-28206.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2019-07-05 10:08:22 -07:00
HyukjinKwon 5c55812400 [SPARK-28198][PYTHON][FOLLOW-UP] Rename mapPartitionsInPandas to mapInPandas with a separate evaluation type
## What changes were proposed in this pull request?

This PR proposes to rename `mapPartitionsInPandas` to `mapInPandas` with a separate evaluation type .

Had an offline discussion with rxin, mengxr and cloud-fan

The reason is basically:

1. `SCALAR_ITER` doesn't make sense with `mapPartitionsInPandas`.
2. It cannot share the same Pandas UDF, for instance, at `select` and `mapPartitionsInPandas` unlike `GROUPED_AGG` because iterator's return type is different.
3. `mapPartitionsInPandas` -> `mapInPandas` - see https://github.com/apache/spark/pull/25044#issuecomment-508298552 and https://github.com/apache/spark/pull/25044#issuecomment-508299764

Renaming `SCALAR_ITER` as `MAP_ITER` is abandoned due to 2. reason.

For `XXX_ITER`, it might have to have a different interface in the future if we happen to add other versions of them. But this is an orthogonal topic with `mapPartitionsInPandas`.

## How was this patch tested?

Existing tests should cover.

Closes #25044 from HyukjinKwon/SPARK-28198.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-05 09:22:41 +09:00
HyukjinKwon 5f7aceb9df [SPARK-28240][PYTHON] Fix Arrow tests to pass with Python 2.7 and latest PyArrow and Pandas in PySpark
## What changes were proposed in this pull request?

In Python 2.7 with latest PyArrow and Pandas, the error message seems a bit different with Python 3. This PR simply fixes the test.

```
======================================================================
FAIL: test_createDataFrame_with_incorrect_schema (pyspark.sql.tests.test_arrow.ArrowTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/.../spark/python/pyspark/sql/tests/test_arrow.py", line 275, in test_createDataFrame_with_incorrect_schema
    self.spark.createDataFrame(pdf, schema=wrong_schema)
AssertionError: "integer.*required.*got.*str" does not match "('Exception thrown when converting pandas.Series (object) to Arrow Array (int32). It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled by using SQL config `spark.sql.execution.pandas.arrowSafeTypeConversion`.', ArrowTypeError('an integer is required',))"

======================================================================
FAIL: test_createDataFrame_with_incorrect_schema (pyspark.sql.tests.test_arrow.EncryptionArrowTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/.../spark/python/pyspark/sql/tests/test_arrow.py", line 275, in test_createDataFrame_with_incorrect_schema
    self.spark.createDataFrame(pdf, schema=wrong_schema)
AssertionError: "integer.*required.*got.*str" does not match "('Exception thrown when converting pandas.Series (object) to Arrow Array (int32). It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled by using SQL config `spark.sql.execution.pandas.arrowSafeTypeConversion`.', ArrowTypeError('an integer is required',))"

```

## How was this patch tested?

Manually tested.

```
cd python
./run-tests --python-executables=python --modules pyspark-sql
```

Closes #25042 from HyukjinKwon/SPARK-28240.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-03 17:46:31 +09:00
HyukjinKwon 02f4763286 [SPARK-28198][PYTHON] Add mapPartitionsInPandas to allow an iterator of DataFrames
## What changes were proposed in this pull request?

This PR proposes to add `mapPartitionsInPandas` API to DataFrame by using existing `SCALAR_ITER` as below:

1. Filtering via setting the column

```python
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

pandas_udf(df.schema, PandasUDFType.SCALAR_ITER)
def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapPartitionsInPandas(filter_func).show()
```

```
+---+---+
| id|age|
+---+---+
|  1| 21|
+---+---+
```

2. `DataFrame.loc`

```python
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd

df = spark.createDataFrame([['aa'], ['bb'], ['cc'], ['aa'], ['aa'], ['aa']], ["value"])

pandas_udf(df.schema, PandasUDFType.SCALAR_ITER)
def filter_func(iterator):
    for pdf in iterator:
        yield pdf.loc[pdf.value.str.contains('^a'), :]

df.mapPartitionsInPandas(filter_func).show()
```

```
+-----+
|value|
+-----+
|   aa|
|   aa|
|   aa|
|   aa|
+-----+
```

3. `pandas.melt`

```python
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd

df = spark.createDataFrame(
    pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                  'B': {0: 1, 1: 3, 2: 5},
                  'C': {0: 2, 1: 4, 2: 6}}))

pandas_udf("A string, variable string, value long", PandasUDFType.SCALAR_ITER)
def filter_func(iterator):
    for pdf in iterator:
        import pandas as pd
        yield pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C'])

df.mapPartitionsInPandas(filter_func).show()
```

```
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|    1|
|  a|       C|    2|
|  b|       B|    3|
|  b|       C|    4|
|  c|       B|    5|
|  c|       C|    6|
+---+--------+-----+
```

The current limitation of `SCALAR_ITER` is that it doesn't allow different length of result, which is pretty critical in practice - for instance, we cannot simply filter by using Pandas APIs but we merely just map N to N. This PR allows map N to M like flatMap.

This API mimics the way of `mapPartitions` but keeps API shape of `SCALAR_ITER` by allowing different results.

### How does this PR implement?

This PR adds mimics both `dapply` with Arrow optimization and Grouped Map Pandas UDF. At Python execution side, it reuses existing `SCALAR_ITER` code path.

Therefore, externally, we don't introduce any new type of Pandas UDF but internally we use another evaluation type code `205` (`SQL_MAP_PANDAS_ITER_UDF`).

This approach is similar with Pandas' Windows function implementation with Grouped Aggregation Pandas UDF functions - internally we have `203` (`SQL_WINDOW_AGG_PANDAS_UDF`) but externally we just share the same `GROUPED_AGG`.

## How was this patch tested?

Manually tested and unittests were added.

Closes #24997 from HyukjinKwon/scalar-udf-iter.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-02 10:54:16 +09:00
Marco Gaido 048224ce9a [SPARK-28170][ML][PYTHON] Uniform Vectors and Matrix documentation
## What changes were proposed in this pull request?

The documentation in `linalg.py` is not consistent. This PR uniforms the documentation.

## How was this patch tested?

NA

Closes #25011 from mgaido91/SPARK-28170.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-01 11:40:12 +09:00
Xiangrui Meng 8299600575 [SPARK-28056][.2][PYTHON][SQL] add docstring/doctest for SCALAR_ITER Pandas UDF
## What changes were proposed in this pull request?

Add docstring/doctest for `SCALAR_ITER` Pandas UDF. I explicitly mentioned that per-partition execution is an implementation detail, not guaranteed. I will submit another PR to add the same to user guide, just to keep this PR minimal.

I didn't add "doctest: +SKIP" in the first commit so it is easy to test locally.

cc: HyukjinKwon gatorsmile icexelloss BryanCutler WeichenXu123

![Screen Shot 2019-06-28 at 9 52 41 AM](https://user-images.githubusercontent.com/829644/60358349-b0aa5400-998a-11e9-9ebf-8481dfd555b5.png)
![Screen Shot 2019-06-28 at 9 53 19 AM](https://user-images.githubusercontent.com/829644/60358355-b1db8100-998a-11e9-8f6f-00a11bdbdc4d.png)

## How was this patch tested?

doctest

Closes #25005 from mengxr/SPARK-28056.2.

Authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-06-28 15:09:57 -07:00
WeichenXu 31e7c37354 [SPARK-28185][PYTHON][SQL] Closes the generator when Python UDFs stop early
## What changes were proposed in this pull request?

 Closes the generator when Python UDFs stop early.

### Manually verification on pandas iterator UDF and mapPartitions

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import col, udf
from pyspark.taskcontext import TaskContext
import time
import os

spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1')
spark.conf.set('spark.sql.pandas.udf.buffer.size', '4')

pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi1(it):
    try:
        for batch in it:
            yield batch + 100
            time.sleep(1.0)
    except BaseException as be:
        print("Debug: exception raised: " + str(type(be)))
        raise be
    finally:
        open("/tmp/000001.tmp", "a").close()

df1 = spark.range(10).select(col('id').alias('a')).repartition(1)

# will see log Debug: exception raised: <class 'GeneratorExit'>
# and file "/tmp/000001.tmp" generated.
df1.select(col('a'), fi1('a')).limit(2).collect()

def mapper(it):
    try:
        for batch in it:
                yield batch
    except BaseException as be:
        print("Debug: exception raised: " + str(type(be)))
        raise be
    finally:
        open("/tmp/000002.tmp", "a").close()

df2 = spark.range(10000000).repartition(1)

# will see log Debug: exception raised: <class 'GeneratorExit'>
# and file "/tmp/000002.tmp" generated.
df2.rdd.mapPartitions(mapper).take(2)

```

## How was this patch tested?

Unit test added.

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #24986 from WeichenXu123/pandas_iter_udf_limit.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-28 17:10:25 +09:00
Bryan Cutler c277afb12b [SPARK-27992][PYTHON] Allow Python to join with connection thread to propagate errors
## What changes were proposed in this pull request?

Currently with `toLocalIterator()` and `toPandas()` with Arrow enabled, if the Spark job being run in the background serving thread errors, it will be caught and sent to Python through the PySpark serializer.
This is not the ideal solution because it is only catch a SparkException, it won't handle an error that occurs in the serializer, and each method has to have it's own special handling to propagate the error.

This PR instead returns the Python Server object along with the serving port and authentication info, so that it allows the Python caller to join with the serving thread. During the call to join, the serving thread Future is completed either successfully or with an exception. In the latter case, the exception will be propagated to Python through the Py4j call.

## How was this patch tested?

Existing tests

Closes #24834 from BryanCutler/pyspark-propagate-server-error-SPARK-27992.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-06-26 13:05:41 -07:00
zhengruifeng c397b06924 [SPARK-28045][ML][PYTHON] add missing RankingEvaluator
## What changes were proposed in this pull request?
add missing RankingEvaluator

## How was this patch tested?
added testsuites

Closes #24869 from zhengruifeng/ranking_eval.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-06-25 06:44:06 -05:00
Li Jin d0fbc4da3b [SPARK-28003][PYTHON] Allow NaT values when creating Spark dataframe from pandas with Arrow
## What changes were proposed in this pull request?

This patch removes `fillna(0)` when creating ArrowBatch from a pandas Series.

With `fillna(0)`, the original code would turn a timestamp type into object type, which pyarrow will complain later:
```
>>> s = pd.Series([pd.NaT, pd.Timestamp('2015-01-01')])
>>> s.dtypes
dtype('<M8[ns]')
>>> s.fillna(0)
0                      0
1    2015-01-01 00:00:00
dtype: object
```

## How was this patch tested?

Added `test_timestamp_nat`

Closes #24844 from icexelloss/SPARK-28003-arrow-nat.

Authored-by: Li Jin <ice.xelloss@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-06-24 11:15:21 -07:00
HyukjinKwon 7c05f61514 [SPARK-28130][PYTHON] Print pretty messages for skipped tests when xmlrunner is available in PySpark
## What changes were proposed in this pull request?

Currently, pretty skipped message added by f7435bec6a mechanism seems not working when xmlrunner is installed apparently.

This PR fixes two things:

1. When `xmlrunner` is installed, seems `xmlrunner` does not respect `vervosity` level in unittests (default is level 1).

    So the output looks as below

    ```
    Running tests...
     ----------------------------------------------------------------------
    SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS
    ----------------------------------------------------------------------
    ```

    So it is not caught by our message detection mechanism.

2. If we manually set the `vervocity` level to `xmlrunner`, it prints messages as below:

    ```
    test_mixed_udf (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... SKIP (0.000s)
    test_mixed_udf_and_sql (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... SKIP (0.000s)
    ...
    ```

    This is different in our Jenkins machine:

    ```
    test_createDataFrame_column_name_encoding (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.23.2 must be installed; however, it was not found.'
    test_createDataFrame_does_not_modify_input (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.23.2 must be installed; however, it was not found.'
    ...
    ```

    Note that last `SKIP` is different. This PR fixes the regular expression to catch `SKIP` case as well.

## How was this patch tested?

Manually tested.

**Before:**

```
Starting test(python2.7): pyspark....
Finished test(python2.7): pyspark.... (0s)
...
Tests passed in 562 seconds

========================================================================
...
```

**After:**

```
Starting test(python2.7): pyspark....
Finished test(python2.7): pyspark.... (48s) ... 93 tests were skipped
...
Tests passed in 560 seconds

Skipped tests pyspark.... with python2.7:
      pyspark...(...) ... SKIP (0.000s)
...

========================================================================
...
```

Closes #24927 from HyukjinKwon/SPARK-28130.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-24 09:58:17 +09:00
Bryan Cutler 5ad1053f3e [SPARK-28128][PYTHON][SQL] Pandas Grouped UDFs skip empty partitions
## What changes were proposed in this pull request?

When running FlatMapGroupsInPandasExec or AggregateInPandasExec the shuffle uses a default number of partitions of 200 in "spark.sql.shuffle.partitions". If the data is small, e.g. in testing, many of the partitions will be empty but are treated just the same.

This PR checks the `mapPartitionsInternal` iterator to be non-empty before calling `ArrowPythonRunner` to start computation on the iterator.

## How was this patch tested?

Existing tests. Ran the following benchmarks a simple example where most partitions are empty:

```python
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

df = spark.createDataFrame(
     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
     ("id", "v"))

pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def normalize(pdf):
    v = pdf.v
    return pdf.assign(v=(v - v.mean()) / v.std())

df.groupby("id").apply(normalize).count()
```

**Before**
```
In [4]: %timeit df.groupby("id").apply(normalize).count()
1.58 s ± 62.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [5]: %timeit df.groupby("id").apply(normalize).count()
1.52 s ± 29.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [6]: %timeit df.groupby("id").apply(normalize).count()
1.52 s ± 37.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```

**After this Change**
```
In [2]: %timeit df.groupby("id").apply(normalize).count()
646 ms ± 89.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [3]: %timeit df.groupby("id").apply(normalize).count()
408 ms ± 84.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [4]: %timeit df.groupby("id").apply(normalize).count()
381 ms ± 29.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```

Closes #24926 from BryanCutler/pyspark-pandas_udf-map-agg-skip-empty-parts-SPARK-28128.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-22 11:20:35 +09:00
HyukjinKwon 113f8c8d13 [SPARK-28132][PYTHON] Update document type conversion for Pandas UDFs (pyarrow 0.13.0, pandas 0.24.2, Python 3.7)
## What changes were proposed in this pull request?

This PR updates the chart generated at SPARK-25666. We deprecated Python 2. It's better to use Python 3.

We don't have to test `unicode` and `long` anymore in Python 3. So it was removed.

Use this code to generate the chart:

```python
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf

columns = [
    ('none', 'object(NoneType)'),
    ('bool', 'bool'),
    ('int8', 'int8'),
    ('int16', 'int16'),
    ('int32', 'int32'),
    ('int64', 'int64'),
    ('uint8', 'uint8'),
    ('uint16', 'uint16'),
    ('uint32', 'uint32'),
    ('uint64', 'uint64'),
    ('float64', 'float16'),
    ('float64', 'float32'),
    ('float64', 'float64'),
    ('date', 'datetime64[ns]'),
    ('tz_aware_dates', 'datetime64[ns, US/Eastern]'),
    ('string', 'object(string)'),
    ('decimal', 'object(Decimal)'),
    ('array', 'object(array[int32])'),
    ('float128', 'float128'),
    ('complex64', 'complex64'),
    ('complex128', 'complex128'),
    ('category', 'category'),
    ('tdeltas', 'timedelta64[ns]'),
]

def create_dataframe():
    import pandas as pd
    import numpy as np
    import decimal
    pdf = pd.DataFrame({
        'none': [None, None],
        'bool': [True, False],
        'int8': np.arange(1, 3).astype('int8'),
        'int16': np.arange(1, 3).astype('int16'),
        'int32': np.arange(1, 3).astype('int32'),
        'int64': np.arange(1, 3).astype('int64'),
        'uint8': np.arange(1, 3).astype('uint8'),
        'uint16': np.arange(1, 3).astype('uint16'),
        'uint32': np.arange(1, 3).astype('uint32'),
        'uint64': np.arange(1, 3).astype('uint64'),
        'float16': np.arange(1, 3).astype('float16'),
        'float32': np.arange(1, 3).astype('float32'),
        'float64': np.arange(1, 3).astype('float64'),
        'float128': np.arange(1, 3).astype('float128'),
        'complex64': np.arange(1, 3).astype('complex64'),
        'complex128': np.arange(1, 3).astype('complex128'),
        'string': list('ab'),
        'array': pd.Series([np.array([1, 2, 3], dtype=np.int32), np.array([1, 2, 3], dtype=np.int32)]),
        'decimal': pd.Series([decimal.Decimal('1'), decimal.Decimal('2')]),
        'date': pd.date_range('19700101', periods=2).values,
        'category': pd.Series(list("AB")).astype('category')})
    pdf['tdeltas'] = [pdf.date.diff()[1], pdf.date.diff()[0]]
    pdf['tz_aware_dates'] = pd.date_range('19700101', periods=2, tz='US/Eastern')
    return pdf

types =  [
    BooleanType(),
    ByteType(),
    ShortType(),
    IntegerType(),
    LongType(),
    FloatType(),
    DoubleType(),
    DateType(),
    TimestampType(),
    StringType(),
    DecimalType(10, 0),
    ArrayType(IntegerType()),
    MapType(StringType(), IntegerType()),
    StructType([StructField("_1", IntegerType())]),
    BinaryType(),
]

df = spark.range(2).repartition(1)
results = []
count = 0
total = len(types) * len(columns)
values = []
spark.sparkContext.setLogLevel("FATAL")
for t in types:
    result = []
    for column, pandas_t in columns:
        v = create_dataframe()[column][0]
        values.append(v)
        try:
            row = df.select(pandas_udf(lambda _: create_dataframe()[column], t)(df.id)).first()
            ret_str = repr(row[0])
        except Exception:
            ret_str = "X"
        result.append(ret_str)
        progress = "SQL Type: [%s]\n  Pandas Value(Type): %s(%s)]\n  Result Python Value: [%s]" % (
            t.simpleString(), v, pandas_t, ret_str)
        count += 1
        print("%s/%s:\n  %s" % (count, total, progress))
    results.append([t.simpleString()] + list(map(str, result)))

schema = ["SQL Type \\ Pandas Value(Type)"] + list(map(lambda values_column: "%s(%s)" % (values_column[0], values_column[1][1]), zip(values, columns)))
strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False)
print("\n".join(map(lambda line: "    # %s  # noqa" % line, strings.strip().split("\n"))))
```

## How was this patch tested?

Manually.

Closes #24930 from HyukjinKwon/SPARK-28132.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-06-21 10:47:54 -07:00
HyukjinKwon 9b9d81b821 [SPARK-28131][PYTHON] Update document type conversion between Python data and SQL types in normal UDFs (Python 3.7)
## What changes were proposed in this pull request?

This PR updates the chart generated at SPARK-25666. We deprecated Python 2. It's better to use Python 3.

We don't have to test `unicode` and `long` anymore in Python 3. So it was removed.

Use this code to generate the chart:

```python
import sys
import array
import datetime
from decimal import Decimal

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import udf

data = [
    None,
    True,
    1,
    "a",
    datetime.date(1970, 1, 1),
    datetime.datetime(1970, 1, 1, 0, 0),
    1.0,
    array.array("i", [1]),
    [1],
    (1,),
    bytearray([65, 66, 67]),
    Decimal(1),
    {"a": 1},
    Row(kwargs=1),
    Row("namedtuple")(1),
]

types =  [
    BooleanType(),
    ByteType(),
    ShortType(),
    IntegerType(),
    LongType(),
    StringType(),
    DateType(),
    TimestampType(),
    FloatType(),
    DoubleType(),
    ArrayType(IntegerType()),
    BinaryType(),
    DecimalType(10, 0),
    MapType(StringType(), IntegerType()),
    StructType([StructField("_1", IntegerType())]),
]

df = spark.range(1)
results = []
count = 0
total = len(types) * len(data)
spark.sparkContext.setLogLevel("FATAL")
for t in types:
    result = []
    for v in data:
        try:
            row = df.select(udf(lambda: v, t)()).first()
            ret_str = repr(row[0])
        except Exception:
            ret_str = "X"
        result.append(ret_str)
        progress = "SQL Type: [%s]\n  Python Value: [%s(%s)]\n  Result Python Value: [%s]" % (
            t.simpleString(), str(v), type(v).__name__, ret_str)
        count += 1
        print("%s/%s:\n  %s" % (count, total, progress))
    results.append([t.simpleString()] + list(map(str, result)))

schema = ["SQL Type \\ Python Value(Type)"] + list(map(lambda v: "%s(%s)" % (str(v), type(v).__name__), data))
strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False)
print("\n".join(map(lambda line: "    # %s  # noqa" % line, strings.strip().split("\n"))))
```

## How was this patch tested?

Manually.

Closes #24929 from HyukjinKwon/SPARK-28131.

Lead-authored-by: HyukjinKwon <gurwls223@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-06-21 10:27:18 -07:00
tools4origins 25c5d57883 [MINOR][DOC] Fix python variance() documentation
## What changes were proposed in this pull request?

The Python documentation incorrectly says that `variance()` acts as `var_pop` whereas it acts like `var_samp` here: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.variance

It was not the case in Spark 1.6 doc but it is in Spark 2.0 doc:
https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/functions.html
https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/sql/functions.html

The Scala documentation is correct: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#variance-org.apache.spark.sql.Column-

The alias is set on this line:
https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L786

## How was this patch tested?
Using variance() in pyspark 2.4.3 returns:
```
>>> spark.createDataFrame([(1, ), (2, ), (3, )], "a: int").select(variance("a")).show()
+-----------+
|var_samp(a)|
+-----------+
|        1.0|
+-----------+
```

Closes #24895 from tools4origins/patch-1.

Authored-by: tools4origins <tools4origins@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-20 08:10:19 -07:00
zhengruifeng 9ec049601a [SPARK-28044][ML][PYTHON] MulticlassClassificationEvaluator support more metrics
## What changes were proposed in this pull request?

expose more metrics in evaluator: weightedTruePositiveRate/weightedFalsePositiveRate/weightedFMeasure/truePositiveRateByLabel/falsePositiveRateByLabel/precisionByLabel/recallByLabel/fMeasureByLabel

## How was this patch tested?
existing cases and add cases

Closes #24868 from zhengruifeng/multi_class_support_bylabel.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-06-19 08:56:15 -05:00
Liang-Chi Hsieh b7bdc3111e [SPARK-28058][DOC] Add a note to doc of mode of CSV for column pruning
## What changes were proposed in this pull request?

When using `DROPMALFORMED` mode, corrupted records aren't dropped if malformed columns aren't read. This behavior is due to CSV parser column pruning. Current doc of `DROPMALFORMED` doesn't mention the effect of column pruning. Users will be confused by the fact that `DROPMALFORMED` mode doesn't work as expected.

Column pruning also affects other modes. This is a doc improvement to add a note to doc of `mode` to explain it.

## How was this patch tested?

N/A. This is just doc change.

Closes #24894 from viirya/SPARK-28058.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-18 13:48:32 +09:00
Bryan Cutler 90f80395af [SPARK-28041][PYTHON] Increase minimum supported Pandas to 0.23.2
## What changes were proposed in this pull request?

This increases the minimum supported version of Pandas to 0.23.2. Using a lower version will raise an error `Pandas >= 0.23.2 must be installed; however, your version was 0.XX`. Also, a workaround for using pyarrow with Pandas 0.19.2 was removed.

## How was this patch tested?

Existing Tests

Closes #24867 from BryanCutler/pyspark-increase-min-pandas-SPARK-28041.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-18 09:10:58 +09:00
WeichenXu 6d441dcdc6 [SPARK-26412][PYSPARK][SQL] Allow Pandas UDF to take an iterator of pd.Series or an iterator of tuple of pd.Series
## What changes were proposed in this pull request?

Allow Pandas UDF to take an iterator of pd.Series or an iterator of tuple of pd.Series.
Note the UDF input args will be always one iterator:
* if the udf take only column as input, the iterator's element will be pd.Series (corresponding to the column values batch)
* if the udf take multiple columns as inputs, the iterator's element will be a tuple composed of multiple `pd.Series`s, each one corresponding to the multiple columns as inputs (keep the same order). For example:
```
pandas_udf("int", PandasUDFType.SCALAR_ITER)
def the_udf(iterator):
    for col1_batch, col2_batch in iterator:
        yield col1_batch + col2_batch

df.select(the_udf("col1", "col2"))
```
The udf above will add col1 and col2.

I haven't add unit tests, but manually tests show it works fine. So it is ready for first pass review.
We can test several typical cases:

```
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import udf
from pyspark.taskcontext import TaskContext

df = spark.createDataFrame([(1, 20), (3, 40)], ["a", "b"])

pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi1(it):
    pid = TaskContext.get().partitionId()
    print("DBG: fi1: do init stuff, partitionId=" + str(pid))
    for batch in it:
        yield batch + 100
    print("DBG: fi1: do close stuff, partitionId=" + str(pid))

pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi2(it):
    pid = TaskContext.get().partitionId()
    print("DBG: fi2: do init stuff, partitionId=" + str(pid))
    for batch in it:
        yield batch + 10000
    print("DBG: fi2: do close stuff, partitionId=" + str(pid))

pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi3(it):
    pid = TaskContext.get().partitionId()
    print("DBG: fi3: do init stuff, partitionId=" + str(pid))
    for x, y in it:
        yield x + y * 10 + 100000
    print("DBG: fi3: do close stuff, partitionId=" + str(pid))

pandas_udf("int", PandasUDFType.SCALAR)
def fp1(x):
    return x + 1000

udf("int")
def fu1(x):
    return x + 10

# test select "pandas iter udf/pandas udf/sql udf" expressions at the same time.
# Note this case the `fi1("a"), fi2("b"), fi3("a", "b")` will generate only one plan,
# and `fu1("a")`, `fp1("a")` will generate another two separate plans.
df.select(fi1("a"), fi2("b"), fi3("a", "b"), fu1("a"), fp1("a")).show()

# test chain two pandas iter udf together
# Note this case `fi2(fi1("a"))` will generate only one plan
# Also note the init stuff/close stuff call order will be like:
# (debug output following)
#     DBG: fi2: do init stuff, partitionId=0
#     DBG: fi1: do init stuff, partitionId=0
#     DBG: fi1: do close stuff, partitionId=0
#     DBG: fi2: do close stuff, partitionId=0
df.select(fi2(fi1("a"))).show()

# test more complex chain
# Note this case `fi1("a"), fi2("a")` will generate one plan,
# and `fi3(fi1_output, fi2_output)` will generate another plan
df.select(fi3(fi1("a"), fi2("a"))).show()
```

## How was this patch tested?

To be added.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #24643 from WeichenXu123/pandas_udf_iter.

Lead-authored-by: WeichenXu <weichen.xu@databricks.com>
Co-authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2019-06-15 08:29:20 -07:00
HyukjinKwon 26998b86c1 [SPARK-27870][SQL][PYTHON] Add a runtime buffer size configuration for Pandas UDFs
## What changes were proposed in this pull request?

This PR is an alternative approach for #24734.

This PR fixes two things:

1. Respects `spark.buffer.size` in Python workers.
2. Adds a runtime buffer size configuration for Pandas UDFs, `spark.sql.pandas.udf.buffer.size` (which falls back to `spark.buffer.size`.

## How was this patch tested?

Manually tested:

```python
import time
from pyspark.sql.functions import *

spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1')
df = spark.range(1, 31, numPartitions=1).select(col('id').alias('a'))

pandas_udf("int", PandasUDFType.SCALAR)
def fp1(x):
    print("run fp1")
    time.sleep(1)
    return x + 100

pandas_udf("int", PandasUDFType.SCALAR)
def fp2(x, y):
    print("run fp2")
    time.sleep(1)
    return x + y

beg_time = time.time()
result = df.select(sum(fp2(fp1('a'), col('a')))).head()
print("result: " + str(result[0]))
print("consume time: " + str(time.time() - beg_time))
```

```
consume time: 62.68265891075134
```

```python
import time
from pyspark.sql.functions import *

spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1')
spark.conf.set('spark.sql.pandas.udf.buffer.size', '4')
df = spark.range(1, 31, numPartitions=1).select(col('id').alias('a'))

pandas_udf("int", PandasUDFType.SCALAR)
def fp1(x):
    print("run fp1")
    time.sleep(1)
    return x + 100

pandas_udf("int", PandasUDFType.SCALAR)
def fp2(x, y):
    print("run fp2")
    time.sleep(1)
    return x + y

beg_time = time.time()
result = df.select(sum(fp2(fp1('a'), col('a')))).head()
print("result: " + str(result[0]))
print("consume time: " + str(time.time() - beg_time))
```

```
consume time: 34.00594782829285
```

Closes #24826 from HyukjinKwon/SPARK-27870.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-15 20:56:22 +09:00
Liang-Chi Hsieh c0297dedd8 [MINOR][PYSPARK][SQL][DOC] Fix rowsBetween doc in Window
## What changes were proposed in this pull request?

I suspect that the doc of `rowsBetween` methods in Scala and PySpark looks wrong.
Because:

```scala
scala> val df = Seq((1, "a"), (2, "a"), (3, "a"), (4, "a"), (5, "a"), (6, "a")).toDF("id", "category")
df: org.apache.spark.sql.DataFrame = [id: int, category: string]

scala> val byCategoryOrderedById = Window.partitionBy('category).orderBy('id).rowsBetween(-1, 2)
byCategoryOrderedById: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec7f04de97

scala> df.withColumn("sum", sum('id) over byCategoryOrderedById).show()
+---+--------+---+
| id|category|sum|
+---+--------+---+
|  1|       a|  6|              # sum from index 0 to (0 + 2): 1 + 2 + 3 = 6
|  2|       a| 10|              # sum from index (1 - 1) to (1 + 2): 1 + 2 + 3 + 4 = 10
|  3|       a| 14|
|  4|       a| 18|
|  5|       a| 15|
|  6|       a| 11|
+---+--------+---+
```

So the frame (-1, 2) for row with index 5, as described in the doc, should range from index 4 to index 7.

## How was this patch tested?

N/A, just doc change.

Closes #24864 from viirya/window-spec-doc.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-14 09:56:37 +09:00
zhengruifeng 7281784883 [SPARK-16692][ML][PYTHON] add MultilabelClassificationEvaluator
## What changes were proposed in this pull request?
add MultilabelClassificationEvaluator

## How was this patch tested?
added testsuites

Closes #24777 from zhengruifeng/multi_label_eval.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-06-13 07:58:22 -05:00
Liang-Chi Hsieh ddf4a50312 [SPARK-28031][PYSPARK][TEST] Improve doctest on over function of Column
## What changes were proposed in this pull request?

Just found the doctest on `over` function of `Column` is commented out. The window spec is also not for the window function used there.

We should either remove the doctest, or improve it.

Because other functions of `Column` have doctest generally, so this PR tries to improve it.

## How was this patch tested?

Added doctest.

Closes #24854 from viirya/column-test-minor.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-13 11:04:41 +09:00
HyukjinKwon 1217996f15 [SPARK-27995][PYTHON] Note the difference between str of Python 2 and 3 at Arrow optimized
## What changes were proposed in this pull request?

When Arrow optimization is enabled in Python 2.7,

```python
import pandas
pdf = pandas.DataFrame(["test1", "test2"])
df = spark.createDataFrame(pdf)
df.show()
```

I got the following output:

```
+----------------+
|               0|
+----------------+
|[74 65 73 74 31]|
|[74 65 73 74 32]|
+----------------+
```

This looks because Python's `str` and `byte` are same. it does look right:

```python
>>> str == bytes
True
>>> isinstance("a", bytes)
True
```

To cut it short:

1. Python 2 treats `str` as `bytes`.
2. PySpark added some special codes and hacks to recognizes `str` as string types.
3. PyArrow / Pandas followed Python 2 difference

To fix, we have two options:

1. Fix it to match the behaviour to PySpark's
2. Note the differences

 but Python 2 is deprecated anyway. I think it's better to just note it and for go option 2.

## How was this patch tested?

Manually tested.

Doc was checked too:

![Screen Shot 2019-06-11 at 6 40 07 PM](https://user-images.githubusercontent.com/6477701/59261402-59ad3b00-8c78-11e9-94a6-3236a2c338d4.png)

Closes #24838 from HyukjinKwon/SPARK-27995.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-11 18:43:59 +09:00
HyukjinKwon f984f6acfe Revert "[SPARK-27870][SQL][PYSPARK] Flush batch timely for pandas UDF (for improving pandas UDFs pipeline)"
## What changes were proposed in this pull request?

This PR reverts 9c4eb99c52 for the reasons below:

1. An alternative was not considered properly, https://github.com/apache/spark/pull/24734#issuecomment-500101639 https://github.com/apache/spark/pull/24734#issuecomment-500102340 https://github.com/apache/spark/pull/24734#issuecomment-499202982 - I opened a PR https://github.com/apache/spark/pull/24826

2. 9c4eb99c52 fixed timely flushing which behaviour is somewhat hacky and the timing isn't also guaranteed (in case each batch takes longer to process).

3. For pipelining for smaller batches, looks it's better to allow to configure buffer size rather than having another factor to flush

## How was this patch tested?

N/A

Closes #24827 from HyukjinKwon/revert-flush.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-06-09 08:28:31 -07:00
WeichenXu 9c4eb99c52 [SPARK-27870][SQL][PYSPARK] Flush batch timely for pandas UDF (for improving pandas UDFs pipeline)
## What changes were proposed in this pull request?

Flush batch timely for pandas UDF.

This could improve performance when multiple pandas UDF plans are pipelined.

When batch being flushed in time, downstream pandas UDFs will get pipelined as soon as possible, and pipeline will help hide the donwstream UDFs computation time. For example:

When the first UDF start computing on batch-3, the second pipelined UDF can start computing on batch-2, and the third pipelined UDF can start computing on batch-1.

If we do not flush each batch in time, the donwstream UDF's pipeline will lag behind too much, which may increase the total processing time.

I add flush at two places:
* JVM process feed data into python worker. In jvm side, when write one batch, flush it
* VM process read data from python worker output, In python worker side, when write one batch, flush it

If no flush, the default buffer size for them are both 65536. Especially in the ML case, in order to make realtime prediction, we will make batch size very small. The buffer size is too large for the case, which cause downstream pandas UDF pipeline lag behind too much.

### Note
* This is only applied to pandas scalar UDF.
* Do not flush for each batch. The minimum interval between two flush is 0.1 second. This avoid too frequent flushing when batch size is small. It works like:
```
        last_flush_time = time.time()
        for batch in iterator:
                writer.write_batch(batch)
                flush_time = time.time()
                if self.flush_timely and (flush_time - last_flush_time > 0.1):
                      stream.flush()
                      last_flush_time = flush_time
```

## How was this patch tested?

### Benchmark to make sure the flush do not cause performance regression
#### Test code:
```
numRows = ...
batchSize = ...

spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', str(batchSize))
df = spark.range(1, numRows + 1, numPartitions=1).select(col('id').alias('a'))

pandas_udf("int", PandasUDFType.SCALAR)
def fp1(x):
    return x + 10

beg_time = time.time()
result = df.select(sum(fp1('a'))).head()
print("result: " + str(result[0]))
print("consume time: " + str(time.time() - beg_time))
```
#### Test Result:

 params        | Consume time (Before) | Consume time (After)
------------ | ----------------------- | ----------------------
numRows=100000000, batchSize=10000 | 23.43s | 24.64s
numRows=100000000, batchSize=1000 | 36.73s | 34.50s
numRows=10000000, batchSize=100 | 35.67s | 32.64s
numRows=1000000, batchSize=10 | 33.60s | 32.11s
numRows=100000, batchSize=1 | 33.36s | 31.82s

### Benchmark pipelined pandas UDF
#### Test code:
```
spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1')
df = spark.range(1, 31, numPartitions=1).select(col('id').alias('a'))

pandas_udf("int", PandasUDFType.SCALAR)
def fp1(x):
    print("run fp1")
    time.sleep(1)
    return x + 100

pandas_udf("int", PandasUDFType.SCALAR)
def fp2(x, y):
    print("run fp2")
    time.sleep(1)
    return x + y

beg_time = time.time()
result = df.select(sum(fp2(fp1('a'), col('a')))).head()
print("result: " + str(result[0]))
print("consume time: " + str(time.time() - beg_time))

```
#### Test Result:

**Before**: consume time: 63.57s
**After**: consume time: 32.43s
**So the PR improve performance by make downstream UDF get pipelined early.**

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #24734 from WeichenXu123/improve_pandas_udf_pipeline.

Lead-authored-by: WeichenXu <weichen.xu@databricks.com>
Co-authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2019-06-07 14:02:43 -07:00
David Vogelbacher f9ca8ab196 [SPARK-27805][PYTHON] Propagate SparkExceptions during toPandas with arrow enabled
## What changes were proposed in this pull request?
Similar to https://github.com/apache/spark/pull/24070, we now propagate SparkExceptions that are encountered during the collect in the java process to the python process.

Fixes https://jira.apache.org/jira/browse/SPARK-27805

## How was this patch tested?
Added a new unit test

Closes #24677 from dvogelbacher/dv/betterErrorMsgWhenUsingArrow.

Authored-by: David Vogelbacher <dvogelbacher@palantir.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-06-04 10:10:27 -07:00
ozan a38d605d0d [SPARK-18570][ML][R] RFormula support * and ^ operators
## What changes were proposed in this pull request?

Added support for `*` and `^` operators, along with expressions within parentheses. New operators just expand to already supported terms, such as;

 - y ~ a * b = y ~ a + b + a : b
 - y ~ (a+b+c)^3 = y ~ a + b + c + a : b + a : c + a :b : c

## How was this patch tested?

Added new unit tests to RFormulaParserSuite

mengxr yanboliang

Closes #24764 from ozancicek/rformula.

Authored-by: ozan <ozancancicekci@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-06-04 08:59:30 -05:00
Xiangrui Meng 216eb36560 [SPARK-27887][PYTHON] Add deprecation warning for Python 2
## What changes were proposed in this pull request?

Add deprecation warning for Python 2.

## How was this patch tested?

Manual tests:

Interactive shell:

~~~
$ bin/pyspark
Python 2.7.15 |Anaconda, Inc.| (default, Nov 13 2018, 17:07:45)
[GCC 4.2.1 Compatible Clang 4.0.1 (tags/RELEASE_401/final)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
19/06/03 14:54:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
/Users/meng/src/spark/python/pyspark/context.py:219: DeprecationWarning: Support for Python 2 is deprecated as of Spark 3.0. See the plan for dropping Python 2 support at https://spark.apache.org/news/plan-for-dropping-python-2-support.html.
  DeprecationWarning)
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.0-SNAPSHOT
      /_/

Using Python version 2.7.15 (default, Nov 13 2018 17:07:45)
SparkSession available as 'spark'.
>>>
~~~

spark-submit job (with default log level set to WARN):

~~~
$ bin/spark-submit test.py
19/06/03 14:54:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
/Users/meng/src/spark/python/lib/pyspark.zip/pyspark/context.py:219: DeprecationWarning: Support for Python 2 is deprecated as of Spark 3.0. See the plan for dropping Python 2 support at https://spark.apache.org/news/plan-for-dropping-python-2-support.html.
  DeprecationWarning)
~~~

Verified that warning messages do not show up in Python 3.

`  DeprecationWarning)` is displayed at the end because `warn` by default print the code line. This behavior can be changed by monkey patching `showwarning` https://stackoverflow.com/questions/2187269/print-only-the-message-on-warnings. It might not worth the effort.

Closes #24786 from mengxr/SPARK-27887.

Authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-04 15:36:52 +09:00
HyukjinKwon db48da87f0 [SPARK-27834][SQL][R][PYTHON] Make separate PySpark/SparkR vectorization configurations
## What changes were proposed in this pull request?

`spark.sql.execution.arrow.enabled` was added when we add PySpark arrow optimization.
Later, in the current master, SparkR arrow optimization was added and it's controlled by the same configuration `spark.sql.execution.arrow.enabled`.

There look two issues about this:

1. `spark.sql.execution.arrow.enabled` in PySpark was added from 2.3.0 whereas SparkR optimization was added 3.0.0. The stability is different so it's problematic when we change the default value for one of both optimization first.

2. Suppose users want to share some JVM by PySpark and SparkR. They are currently forced to use the optimization for all or none if the configuration is set globally.

This PR proposes two separate configuration groups for PySpark and SparkR about Arrow optimization:

- Deprecate `spark.sql.execution.arrow.enabled`
- Add `spark.sql.execution.arrow.pyspark.enabled` (fallback to `spark.sql.execution.arrow.enabled`)
- Add `spark.sql.execution.arrow.sparkr.enabled`
- Deprecate `spark.sql.execution.arrow.fallback.enabled`
- Add `spark.sql.execution.arrow.pyspark.fallback.enabled ` (fallback to `spark.sql.execution.arrow.fallback.enabled`)

Note that `spark.sql.execution.arrow.maxRecordsPerBatch` is used within JVM side for both.
Note that `spark.sql.execution.arrow.fallback.enabled` was added due to behaviour change. We don't need it in SparkR - SparkR side has the automatic fallback.

## How was this patch tested?

Manually tested and some unittests were added.

Closes #24700 from HyukjinKwon/separate-sparkr-arrow.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-03 10:01:37 +09:00
Jose Torres 5fae8f7b1d [SPARK-27711][CORE] Unset InputFileBlockHolder at the end of tasks
## What changes were proposed in this pull request?

Unset InputFileBlockHolder at the end of tasks to stop the file name from leaking over to other tasks in the same thread. This happens in particular in Pyspark because of its complex threading model.

## How was this patch tested?

new pyspark test

Closes #24605 from jose-torres/fix254.

Authored-by: Jose Torres <torres.joseph.f+github@gmail.com>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
2019-05-22 18:35:50 -07:00
David Vogelbacher 034cb139a1 [SPARK-27778][PYTHON] Fix toPandas conversion of empty DataFrame with Arrow enabled
## What changes were proposed in this pull request?
https://github.com/apache/spark/pull/22275 introduced a performance improvement where we send partitions out of order to python and then, as a last step, send the partition order as well.
However, if there are no partitions we will never send the partition order and we will get an "EofError" on the python side.
This PR fixes this by also sending the partition order if there are no partitions present.

## How was this patch tested?
New unit test added.

Closes #24650 from dvogelbacher/dv/fixNoPartitionArrowConversion.

Authored-by: David Vogelbacher <dvogelbacher@palantir.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-05-22 13:21:26 +09:00
Sean Owen eed6de1a65 [MINOR][DOCS] Tighten up some key links to the project and download pages to use HTTPS
## What changes were proposed in this pull request?

Tighten up some key links to the project and download pages to use HTTPS

## How was this patch tested?

N/A

Closes #24665 from srowen/HTTPSURLs.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-05-21 10:56:42 -07:00
HyukjinKwon 20fb01bbea [MINOR][PYTHON] Remove explain(True) in test_udf.py
## What changes were proposed in this pull request?

Not a big deal but it bugged me. This PR removes printing out plans in PySpark UDF tests.

Before:

```
Running tests...
----------------------------------------------------------------------
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
== Parsed Logical Plan ==
GlobalLimit 1
+- LocalLimit 1
   +- Project [id#668L, <lambda>(id#668L) AS copy#673]
      +- Sort [id#668L ASC NULLS FIRST], true
         +- Range (0, 10, step=1, splits=Some(4))

== Analyzed Logical Plan ==
id: bigint, copy: int
GlobalLimit 1
+- LocalLimit 1
   +- Project [id#668L, <lambda>(id#668L) AS copy#673]
      +- Sort [id#668L ASC NULLS FIRST], true
         +- Range (0, 10, step=1, splits=Some(4))

== Optimized Logical Plan ==
GlobalLimit 1
+- LocalLimit 1
   +- Project [id#668L, pythonUDF0#676 AS copy#673]
      +- BatchEvalPython [<lambda>(id#668L)], [id#668L, pythonUDF0#676]
         +- Range (0, 10, step=1, splits=Some(4))

== Physical Plan ==
CollectLimit 1
+- *(2) Project [id#668L, pythonUDF0#676 AS copy#673]
   +- BatchEvalPython [<lambda>(id#668L)], [id#668L, pythonUDF0#676]
      +- *(1) Range (0, 10, step=1, splits=4)

...........................................
----------------------------------------------------------------------
Ran 43 tests in 19.777s
```

After:

```
Running tests...
----------------------------------------------------------------------
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
...........................................
----------------------------------------------------------------------
Ran 43 tests in 25.201s
```

## How was this patch tested?

N/A

Closes #24661 from HyukjinKwon/remove-explain-in-test.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-05-21 23:39:31 +09:00
Reynold Xin cfe236f695 [MINOR][DOCS] Make Spark's description consistent in docs with websites
We updated our website a long time ago to describe Spark as the unified analytics engine, which is also how Spark is described in the community now. But our README and docs page still use the same description from 2011 ... This patch updates them.

The patch also updates the README example to use more modern APIs, and refer to Structured Streaming rather than Spark Streaming.

Closes #24573 from rxin/consistent-message.

Authored-by: Reynold Xin <rxin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-05-10 17:55:23 +09:00
qb-tarushg 9b3211a194 [SPARK-27540][MLLIB] Add 'meanAveragePrecision_at_k' metric to RankingMetrics
## What changes were proposed in this pull request?

Added method 'meanAveragePrecisionAt' k to RankingMetrics.

This branch is rebased with squashed commits from https://github.com/apache/spark/pull/24458

## How was this patch tested?

Added code in the existing test RankingMetricsSuite.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #24543 from qb-tarushg/SPARK-27540-REBASE.

Authored-by: qb-tarushg <tarush.grover@quantumblack.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-05-09 08:47:05 -05:00
Gengliang Wang 78a403fab9 [SPARK-27627][SQL] Make option "pathGlobFilter" as a general option for all file sources
## What changes were proposed in this pull request?

### Background:
The data source option `pathGlobFilter` is introduced for Binary file format: https://github.com/apache/spark/pull/24354 , which can be used for filtering file names, e.g. reading `.png` files only while there is `.json` files in the same directory.

### Proposal:
Make the option `pathGlobFilter` as a general option for all file sources. The path filtering should happen in the path globbing on Driver.

### Motivation:
Filtering the file path names in file scan tasks on executors is kind of ugly.

### Impact:
1. The splitting of file partitions will be more balanced.
2. The metrics of file scan will be more accurate.
3. Users can use the option for reading other file sources.

## How was this patch tested?

Unit tests

Closes #24518 from gengliangwang/globFilter.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-05-09 08:41:43 +09:00
Bryan Cutler 5e79ae3b40 [SPARK-23961][SPARK-27548][PYTHON] Fix error when toLocalIterator goes out of scope and properly raise errors from worker
## What changes were proposed in this pull request?

This fixes an error when a PySpark local iterator, for both RDD and DataFrames, goes out of scope and the connection is closed before fully consuming the iterator. The error occurs on the JVM in the serving thread, when Python closes the local socket while the JVM is writing to it. This usually happens when there is enough data to fill the socket read buffer, causing the write call to block.

Additionally, this fixes a problem when an error occurs in the Python worker and the collect job is cancelled with an exception. Previously, the Python driver was never notified of the error so the user could get a partial result (iteration until the error) and the application will continue. With this change, an error in the worker is sent to the Python iterator and is then raised.

The change here introduces a protocol for PySpark local iterators that work as follows:

1) The local socket connection is made when the iterator is created
2) When iterating, Python first sends a request for partition data as a non-zero integer
3) While the JVM local iterator over partitions has next, it triggers a job to collect the next partition
4) The JVM sends a nonzero response to indicate it has the next partition to send
5) The next partition is sent to Python and read by the PySpark deserializer
6) After sending the entire partition, an `END_OF_DATA_SECTION` is sent to Python which stops the deserializer and allows to make another request
7) When the JVM gets a request from Python but has already consumed it's local iterator, it will send a zero response to Python and both will close the socket cleanly
8) If an error occurs in the worker, a negative response is sent to Python followed by the error message. Python will then raise a RuntimeError with the message, stopping iteration.
9) When the PySpark local iterator is garbage-collected, it will read any remaining data from the current partition (this is data that has already been collected) and send a request of zero to tell the JVM to stop collection jobs and close the connection.

Steps 1, 3, 5, 6 are the same as before. Step 8 was completely missing before because errors in the worker were never communicated back to Python. The other steps add synchronization to allow for a clean closing of the socket, with a small trade-off in performance for each partition. This is mainly because the JVM does not start collecting partition data until it receives a request to do so, where before it would eagerly write all data until the socket receive buffer is full.

## How was this patch tested?

Added new unit tests for DataFrame and RDD `toLocalIterator` and tested not fully consuming the iterator. Manual tests with Python 2.7  and 3.6.

Closes #24070 from BryanCutler/pyspark-toLocalIterator-clean-stop-SPARK-23961.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-05-07 14:47:39 -07:00
Tibor Csögör eec1a3c286 [SPARK-23299][SQL][PYSPARK] Fix __repr__ behaviour for Rows
This is PR is meant to replace #20503, which lay dormant for a while.  The solution in the original PR is still valid, so this is just that patch rebased onto the current master.

Original summary follows.

## What changes were proposed in this pull request?

Fix `__repr__` behaviour for Rows.

Rows `__repr__` assumes data is a string when column name is missing.
Examples,

```
>>> from pyspark.sql.types import Row
>>> Row ("Alice", "11")
<Row(Alice, 11)>

>>> Row (name="Alice", age=11)
Row(age=11, name='Alice')

>>> Row ("Alice", 11)
<snip stack trace>
TypeError: sequence item 1: expected string, int found
```

This is because Row () when called without column names assumes everything is a string.

## How was this patch tested?

Manually tested and a unit test was added to `python/pyspark/sql/tests/test_types.py`.

Closes #24448 from tbcs/SPARK-23299.

Lead-authored-by: Tibor Csögör <tibi@tiborius.net>
Co-authored-by: Shashwat Anand <me@shashwat.me>
Signed-off-by: Holden Karau <holden@pigscanfly.ca>
2019-05-06 10:00:49 -07:00
Liang-Chi Hsieh d9bcacf94b [SPARK-27629][PYSPARK] Prevent Unpickler from intervening each unpickling
## What changes were proposed in this pull request?

In SPARK-27612, one correctness issue was reported. When protocol 4 is used to pickle Python objects, we found that unpickled objects were wrong. A temporary fix was proposed by not using highest protocol.

It was found that Opcodes.MEMOIZE was appeared in the opcodes in protocol 4. It is suspect to this issue.

A deeper dive found that Opcodes.MEMOIZE stores objects into internal map of Unpickler object. We use single Unpickler object to unpickle serialized Python bytes. Stored objects intervenes next round of unpickling, if the map is not cleared.

We has two options:

1. Continues to reuse Unpickler, but calls its close after each unpickling.
2. Not to reuse Unpickler and create new Unpickler object in each unpickling.

This patch takes option 1.

## How was this patch tested?

Passing the test added in SPARK-27612 (#24519).

Closes #24521 from viirya/SPARK-27629.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-05-04 13:21:08 +09:00
HyukjinKwon 5c479243de [SPARK-27612][PYTHON] Use Python's default protocol instead of highest protocol
## What changes were proposed in this pull request?

This PR partially reverts https://github.com/apache/spark/pull/20691

After we changed the Python protocol to highest ones, seems like it introduced a correctness bug. This potentially affects all Python related code paths.

I suspect a bug related to Pryolite (maybe opcodes `MEMOIZE`, `FRAME` and/or our `RowPickler`). I would like to stick to default protocol for now and investigate the issue separately.

I will separately investigate later to bring highest protocol back.

## How was this patch tested?

Unittest was added.

```bash
./run-tests --python-executables=python3.7 --testname "pyspark.sql.tests.test_serde SerdeTests.test_int_array_serialization"
```

Closes #24519 from HyukjinKwon/SPARK-27612.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-05-03 14:40:13 +09:00
sangramga 8375103568 [SPARK-27557][DOC] Add copy button to Python API docs for easier copying of code-blocks
## What changes were proposed in this pull request?

Add a non-intrusive button for python API documentation, which will remove ">>>" prompts and outputs of code - for easier copying of code.

For example: The below code-snippet in the document is difficult to copy due to ">>>" prompts
```
>>> l = [('Alice', 1)]
>>> spark.createDataFrame(l).collect()
[Row(_1='Alice', _2=1)]

```
Becomes this - After the copybutton in the corner of of code-block is pressed - which is easier to copy
```
l = [('Alice', 1)]
spark.createDataFrame(l).collect()
```

![image](https://user-images.githubusercontent.com/9406431/56715817-560c3600-6756-11e9-8bae-58a3d2d57df3.png)

## File changes
Made changes to python/docs/conf.py and copybutton.js - thus only modifying sphinx frontend and no changes were made to the documentation itself- Build process for documentation remains the same.

copybutton.js -> This JS snippet was taken from the official python.org documentation site.

## How was this patch tested?
NA

Closes #24456 from sangramga/copybutton.

Authored-by: sangramga <sangramga@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-05-01 11:26:18 -05:00
Gabor Somogyi fb6b19ab7c [SPARK-23014][SS] Fully remove V1 memory sink.
## What changes were proposed in this pull request?

There is a MemorySink v2 already so v1 can be removed. In this PR I've removed it completely.
What this PR contains:
* V1 memory sink removal
* V2 memory sink renamed to become the only implementation
* Since DSv2 sends exceptions in a chained format (linking them with cause field) I've made python side compliant
* Adapted all the tests

## How was this patch tested?

Existing unit tests.

Closes #24403 from gaborgsomogyi/SPARK-23014.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-04-29 09:44:23 -07:00
Jash Gala 90085a1847 [SPARK-23619][DOCS] Add output description for some generator expressions / functions
## What changes were proposed in this pull request?

This PR addresses SPARK-23619: https://issues.apache.org/jira/browse/SPARK-23619

It adds additional comments indicating the default column names for the `explode` and `posexplode`
functions in Spark-SQL.

Functions for which comments have been updated so far:
* stack
* inline
* explode
* posexplode
* explode_outer
* posexplode_outer

## How was this patch tested?

This is just a change in the comments. The package builds and tests successfullly after the change.

Closes #23748 from jashgala/SPARK-23619.

Authored-by: Jash Gala <jashgala@amazon.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-04-27 10:30:12 +09:00
Andrew-Crosby 5bf5d9d854 [SPARK-26970][PYTHON][ML] Add Spark ML interaction transformer to PySpark
## What changes were proposed in this pull request?

Adds the Spark ML Interaction transformer to PySpark

## How was this patch tested?

- Added Python doctest
- Ran the newly added example code
- Manually confirmed that a PipelineModel that contains an Interaction transformer can now be loaded in PySpark

Closes #24426 from Andrew-Crosby/pyspark-interaction-transformer.

Lead-authored-by: Andrew-Crosby <37139900+Andrew-Crosby@users.noreply.github.com>
Co-authored-by: Andrew-Crosby <andrew.crosby@autotrader.co.uk>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-04-23 13:53:33 -07:00
Bryan Cutler d36cce18e2 [SPARK-27276][PYTHON][SQL] Increase minimum version of pyarrow to 0.12.1 and remove prior workarounds
## What changes were proposed in this pull request?

This increases the minimum support version of pyarrow to 0.12.1 and removes workarounds in pyspark to remain compatible with prior versions. This means that users will need to have at least pyarrow 0.12.1 installed and available in the cluster or an `ImportError` will be raised to indicate an upgrade is needed.

## How was this patch tested?

Existing tests using:
Python 2.7.15, pyarrow 0.12.1, pandas 0.24.2
Python 3.6.7, pyarrow 0.12.1, pandas 0.24.0

Closes #24298 from BryanCutler/arrow-bump-min-pyarrow-SPARK-27276.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-04-22 19:30:31 +09:00
shane knapp e1ece6a319 [SPARK-25079][PYTHON] update python3 executable to 3.6.x
## What changes were proposed in this pull request?

have jenkins test against python3.6 (instead of 3.4).

## How was this patch tested?

extensive testing on both the centos and ubuntu jenkins workers.

NOTE:  this will need to be backported to all active branches.

Closes #24266 from shaneknapp/updating-python3-executable.

Authored-by: shane knapp <incomplete@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-04-19 10:03:50 +09:00
Bryan Cutler f62f44f2a2 [SPARK-27387][PYTHON][TESTS] Replace sqlutils.assertPandasEqual with Pandas assert_frame_equals
## What changes were proposed in this pull request?

Running PySpark tests with Pandas 0.24.x causes a failure in `test_pandas_udf_grouped_map` test_supported_types:
`ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()`

This is because a column is an ArrayType and the method `sqlutils ReusedSQLTestCase.assertPandasEqual ` does not properly check this.

This PR removes `assertPandasEqual` and replaces it with the built-in `pandas.util.testing.assert_frame_equal` which can properly handle columns of ArrayType and also prints out better diff between the DataFrames when an error occurs.

Additionally, imports of pandas and pyarrow were moved to the top of related test files to avoid duplicating the same import many times.

## How was this patch tested?

Existing tests

Closes #24306 from BryanCutler/python-pandas-assert_frame_equal-SPARK-27387.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-04-10 07:50:25 +09:00
Liang-Chi Hsieh d04a7371da [MINOR][DOC][SQL] Remove out-of-date doc about ORC in DataFrameReader and Writer
## What changes were proposed in this pull request?

According to current status, `orc` is available even Hive support isn't enabled. This is a minor doc change to reflect it.

## How was this patch tested?

Doc only change.

Closes #24280 from viirya/fix-orc-doc.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-04-03 09:11:09 -07:00
Maxim Gekk 1d20d13149 [SPARK-25496][SQL] Deprecate from_utc_timestamp and to_utc_timestamp
## What changes were proposed in this pull request?

In the PR, I propose to deprecate the `from_utc_timestamp()` and `to_utc_timestamp`, and disable them by default. The functions can be enabled back via the SQL config `spark.sql.legacy.utcTimestampFunc.enabled`. By default, any calls of the functions throw an analysis exception.

One of the reason for deprecation is functions violate semantic of `TimestampType` which is number of microseconds since epoch in UTC time zone. Shifting microseconds since epoch by time zone offset doesn't make sense because the result doesn't represent microseconds since epoch in UTC time zone any more, and cannot be considered as `TimestampType`.

## How was this patch tested?

The changes were tested by `DateExpressionsSuite` and `DateFunctionsSuite`.

Closes #24195 from MaxGekk/conv-utc-timestamp-deprecate.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-04-03 10:55:56 +08:00