## What changes were proposed in this pull request?
Currently, PythonBroadcast may delete its data file while a python worker still needs it. This happens because PythonBroadcast overrides the `finalize()` method to delete its data file. So, when GC happens and no references on broadcast variable, it may trigger `finalize()` to delete
data file. That's also means, data under python Broadcast variable couldn't be deleted when `unpersist()`/`destroy()` called but relys on GC.
In this PR, we removed the `finalize()` method, and map the PythonBroadcast data file to a BroadcastBlock(which has the same broadcast id with the broadcast variable who wrapped this PythonBroadcast) when PythonBroadcast is deserializing. As a result, the data file could be deleted just like other pieces of the Broadcast variable when `unpersist()`/`destroy()` called and do not rely on GC any more.
## How was this patch tested?
Added a Python test, and tested manually(verified create/delete the broadcast block).
Closes#25262 from Ngone51/SPARK-28486.
Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This PR picks up https://github.com/apache/spark/pull/25315 back after removing `Popen.wait` usage which exists in Python 3 only. I saw the last test results wrongly and thought it was passed.
Fix flaky test DaemonTests.do_termination_test which fail on Python 3.7. I add a sleep after the test connection to daemon.
## How was this patch tested?
Run test
```
python/run-tests --python-executables=python3.7 --testname "pyspark.tests.test_daemon DaemonTests"
```
**Before**
Fail on test "test_termination_sigterm". And we can see daemon process do not exit.
**After**
Test passed
Closes#25343 from HyukjinKwon/SPARK-28582.
Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Update HashingTF to use new implementation of MurmurHash3
Make HashingTF use the old MurmurHash3 when a model from pre 3.0 is loaded
## How was this patch tested?
Change existing unit tests. Also add one unit test to make sure HashingTF use the old MurmurHash3 when a model from pre 3.0 is loaded
Closes#25303 from huaxingao/spark-23469.
Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Fix flaky test DaemonTests.do_termination_test which fail on Python 3.7. I add a sleep after the test connection to daemon.
## How was this patch tested?
Run test
```
python/run-tests --python-executables=python3.7 --testname "pyspark.tests.test_daemon DaemonTests"
```
**Before**
Fail on test "test_termination_sigterm". And we can see daemon process do not exit.
**After**
Test passed
Closes#25315 from WeichenXu123/fix_py37_daemon.
Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This PR proposes to use `AtomicReference` so that parent and child threads can access to the same file block holder.
Python UDF expressions are turned to a plan and then it launches a separate thread to consume the input iterator. In the separate child thread, the iterator sets `InputFileBlockHolder.set` before the parent does which the parent thread is unable to read later.
1. In this separate child thread, if it happens to call `InputFileBlockHolder.set` first without initialization of the parent's thread local (which is done when the `ThreadLocal.get()` is first called), the child thread seems calling its own `initialValue` to initialize.
2. After that, the parent calls its own `initialValue` to initializes at the first call of `ThreadLocal.get()`.
3. Both now have two different references. Updating at child isn't reflected to parent.
This PR fixes it via initializing parent's thread local with `AtomicReference` for file status so that they can be used in each task, and children thread's update is reflected.
I also tried to explain this a bit more at https://github.com/apache/spark/pull/24958#discussion_r297203041.
## How was this patch tested?
Manually tested and unittest was added.
Closes#24958 from HyukjinKwon/SPARK-28153.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
I remove the deprecate `ImageSchema.readImages`.
Move some useful methods from class `ImageSchema` into class `ImageFileFormat`.
In pyspark, I rename `ImageSchema` class to be `ImageUtils`, and keep some useful python methods in it.
## How was this patch tested?
UT.
Please review https://spark.apache.org/contributing.html before opening a pull request.
Closes#25245 from WeichenXu123/remove_image_schema.
Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
PySpark worker daemon reads from stdin the worker PIDs to kill. 1bb60ab839/python/pyspark/daemon.py (L127)
However, the worker process is a forked process from the worker daemon process and we didn't close stdin on the child after fork. This means the child and user program can read stdin as well, which blocks daemon from receiving the PID to kill. This can cause issues because the task reaper might detect the task was not terminated and eventually kill the JVM.
This PR fix this by redirecting the standard input of the forked child to devnull.
## How was this patch tested?
Manually test.
In `pyspark`, run:
```
import subprocess
def task(_):
subprocess.check_output(["cat"])
sc.parallelize(range(1), 1).mapPartitions(task).count()
```
Before:
The job will get stuck and press Ctrl+C to exit the job but the python worker process do not exit.
After:
The job finish correctly. The "cat" print nothing (because the dummay stdin is "/dev/null").
The python worker process exit normally.
Please review https://spark.apache.org/contributing.html before opening a pull request.
Closes#25138 from WeichenXu123/SPARK-26175.
Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Implement `RobustScaler`
Since the transformation is quite similar to `StandardScaler`, I refactor the transform function so that it can be reused in both scalers.
## How was this patch tested?
existing and added tests
Closes#25160 from zhengruifeng/robust_scaler.
Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
In the PR, I propose to use `uuuu` for years instead of `yyyy` in date/timestamp patterns without the era pattern `G` (https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). **Parsing/formatting of positive years (current era) will be the same.** The difference is in formatting negative years belong to previous era - BC (Before Christ).
I replaced the `yyyy` pattern by `uuuu` everywhere except:
1. Test, Suite & Benchmark. Existing tests must work as is.
2. `SimpleDateFormat` because it doesn't support the `uuuu` pattern.
3. Comments and examples (except comments related to already replaced patterns).
Before the changes, the year of common era `100` and the year of BC era `-99`, showed similarly as `100`. After the changes negative years will be formatted with the `-` sign.
Before:
```Scala
scala> Seq(java.time.LocalDate.of(-99, 1, 1)).toDF().show
+----------+
| value|
+----------+
|0100-01-01|
+----------+
```
After:
```Scala
scala> Seq(java.time.LocalDate.of(-99, 1, 1)).toDF().show
+-----------+
| value|
+-----------+
|-0099-01-01|
+-----------+
```
## How was this patch tested?
By existing test suites, and added tests for negative years to `DateFormatterSuite` and `TimestampFormatterSuite`.
Closes#25230 from MaxGekk/year-pattern-uuuu.
Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
Add indexOf method for ml.feature.HashingTF.
## How was this patch tested?
Add Unit test.
Closes#25250 from huaxingao/spark-21481.
Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Replace duplicate code by function `_memory_limit`
## How was this patch tested?
Existing UTs
Closes#25273 from WangGuangxin/python_memory_limit.
Authored-by: wangguangxin.cn <wangguangxin.cn@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
remove deprecated ``` def context(self, sqlContext)``` from ```pyspark/ml/util.py```
## How was this patch tested?
test with existing ml PySpark test suites
Closes#25246 from huaxingao/spark-28507.
Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Remove deprecated setFeatureSubsetStrategy and setSubsamplingRate from Python TreeEnsembleParams
## How was this patch tested?
Use existing tests.
Closes#25046 from huaxingao/spark-28243.
Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This adds simple check for `count` argument:
- If it is a `Column` we apply `_to_java_column` before invoking JVM counterpart
- Otherwise we proceed as before.
## How was this patch tested?
Manual testing.
Closes#25193 from zero323/SPARK-28278.
Authored-by: zero323 <mszymkiewicz@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
In the following python code
```
df.write.mode("overwrite").insertInto("table")
```
```insertInto``` ignores ```mode("overwrite")``` and appends by default.
## How was this patch tested?
Add Unit test.
Closes#25175 from huaxingao/spark-28411.
Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
In the PR, I propose to convert options values to strings by using `to_str()` for the following functions: `from_csv()`, `to_csv()`, `from_json()`, `to_json()`, `schema_of_csv()` and `schema_of_json()`. This will make handling of function options consistent to option handling in `DataFrameReader`/`DataFrameWriter`.
For example:
```Python
df.select(from_csv(df.value, "s string", {'ignoreLeadingWhiteSpace': True})
```
## How was this patch tested?
Added an example for `from_csv()` which was tested by:
```Shell
./python/run-tests --testnames pyspark.sql.functions
```
Closes#25182 from MaxGekk/options_to_str.
Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
It fixes a flaky test:
```
ERROR [0.164s]: test_query_execution_listener_on_collect (pyspark.sql.tests.test_dataframe.QueryExecutionListenerTests)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/jenkins/python/pyspark/sql/tests/test_dataframe.py", line 758, in test_query_execution_listener_on_collect
"The callback from the query execution listener should be called after 'collect'")
AssertionError: The callback from the query execution listener should be called after 'collect'
```
Seems it can be failed because the event was somehow delayed but checked first.
## How was this patch tested?
Manually.
Closes#25177 from HyukjinKwon/SPARK-28418.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This upgraded to a newer version of Pyrolite. Most updates [1] in the newer version are for dotnot. For java, it includes a bug fix to Unpickler regarding cleaning up Unpickler memo, and support of protocol 5.
After upgrading, we can remove the fix at SPARK-27629 for the bug in Unpickler.
[1] https://github.com/irmen/Pyrolite/compare/pyrolite-4.23...master
## How was this patch tested?
Manually tested on Python 3.6 in local on existing tests.
Closes#25143 from viirya/upgrade-pyrolite.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
The `_prepare_for_python_RDD` method currently broadcasts a pickled command if its length is greater than the hardcoded value `1 << 20` (1M). This change sets this value as a Spark conf instead.
## How was this patch tested?
Unit tests, manual tests.
Closes#25123 from jessecai/SPARK-28355.
Authored-by: Jesse Cai <jesse.cai@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
Add python api support and JavaSparkContext support for resources(). I needed the JavaSparkContext support for it to properly translate into python with the py4j stuff.
## How was this patch tested?
Unit tests added and manually tested in local cluster mode and on yarn.
Closes#25087 from tgravescs/SPARK-28234-python.
Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
There is a bug in `ExtractPythonUDFs` that produces wrong result attributes. It causes a failure when using `PythonUDF`s among multiple child plans, e.g., join. An example is using `PythonUDF`s in join condition.
```python
>>> left = spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)])
>>> right = spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)])
>>> f = udf(lambda a: a, IntegerType())
>>> df = left.join(right, [f("a") == f("b"), left.a1 == right.b1])
>>> df.collect()
19/07/10 12:20:49 ERROR Executor: Exception in task 5.0 in stage 0.0 (TID 5)
java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:201)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt(rows.scala:36)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt$(rows.scala:36)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.isNullAt(rows.scala:195)
at org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:70)
...
```
## How was this patch tested?
Added test.
Closes#25091 from viirya/SPARK-28323.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
## What changes were proposed in this pull request?
In both cases, the input `DataFrame` schema must contain only the information that's required for the matrix object, so a vector column in the case of `RowMatrix` and long and vector columns for `IndexedRowMatrix`.
## How was this patch tested?
Unit tests that verify:
- `RowMatrix` and `IndexedRowMatrix` can be created from `DataFrame`s
- If the schema does not match expectations, we throw an `IllegalArgumentException`
Please review https://spark.apache.org/contributing.html before opening a pull request.
Closes#24953 from henrydavidge/row-matrix-df.
Authored-by: Henry D <henrydavidge@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This PR adds some tests converted from `pgSQL/case.sql'` to test UDFs. Please see contribution guide of this umbrella ticket - [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921).
This PR also contains two minor fixes:
1. Change name of Scala UDF from `UDF:name(...)` to `name(...)` to be consistent with Python'
2. Fix Scala UDF at `IntegratedUDFTestUtils.scala ` to handle `null` in strings.
<details><summary>Diff comparing to 'pgSQL/case.sql'</summary>
<p>
```diff
diff --git a/sql/core/src/test/resources/sql-tests/results/pgSQL/case.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-case.sql.out
index fa078d16d6d..55bef64338f 100644
--- a/sql/core/src/test/resources/sql-tests/results/pgSQL/case.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-case.sql.out
-115,7 +115,7 struct<>
-- !query 13
SELECT '3' AS `One`,
CASE
- WHEN 1 < 2 THEN 3
+ WHEN CAST(udf(1 < 2) AS boolean) THEN 3
END AS `Simple WHEN`
-- !query 13 schema
struct<One:string,Simple WHEN:int>
-126,10 +126,10 struct<One:string,Simple WHEN:int>
-- !query 14
SELECT '<NULL>' AS `One`,
CASE
- WHEN 1 > 2 THEN 3
+ WHEN 1 > 2 THEN udf(3)
END AS `Simple default`
-- !query 14 schema
-struct<One:string,Simple default:int>
+struct<One:string,Simple default:string>
-- !query 14 output
<NULL> NULL
-137,17 +137,17 struct<One:string,Simple default:int>
-- !query 15
SELECT '3' AS `One`,
CASE
- WHEN 1 < 2 THEN 3
- ELSE 4
+ WHEN udf(1) < 2 THEN udf(3)
+ ELSE udf(4)
END AS `Simple ELSE`
-- !query 15 schema
-struct<One:string,Simple ELSE:int>
+struct<One:string,Simple ELSE:string>
-- !query 15 output
3 3
-- !query 16
-SELECT '4' AS `One`,
+SELECT udf('4') AS `One`,
CASE
WHEN 1 > 2 THEN 3
ELSE 4
-159,10 +159,10 struct<One:string,ELSE default:int>
-- !query 17
-SELECT '6' AS `One`,
+SELECT udf('6') AS `One`,
CASE
- WHEN 1 > 2 THEN 3
- WHEN 4 < 5 THEN 6
+ WHEN CAST(udf(1 > 2) AS boolean) THEN 3
+ WHEN udf(4) < 5 THEN 6
ELSE 7
END AS `Two WHEN with default`
-- !query 17 schema
-173,7 +173,7 struct<One:string,Two WHEN with default:int>
-- !query 18
SELECT '7' AS `None`,
- CASE WHEN rand() < 0 THEN 1
+ CASE WHEN rand() < udf(0) THEN 1
END AS `NULL on no matches`
-- !query 18 schema
struct<None:string,NULL on no matches:int>
-182,36 +182,36 struct<None:string,NULL on no matches:int>
-- !query 19
-SELECT CASE WHEN 1=0 THEN 1/0 WHEN 1=1 THEN 1 ELSE 2/0 END
+SELECT CASE WHEN CAST(udf(1=0) AS boolean) THEN 1/0 WHEN 1=1 THEN 1 ELSE 2/0 END
-- !query 19 schema
-struct<CASE WHEN (1 = 0) THEN (CAST(1 AS DOUBLE) / CAST(0 AS DOUBLE)) WHEN (1 = 1) THEN CAST(1 AS DOUBLE) ELSE (CAST(2 AS DOUBLE) / CAST(0 AS DOUBLE)) END:double>
+struct<CASE WHEN CAST(udf((1 = 0)) AS BOOLEAN) THEN (CAST(1 AS DOUBLE) / CAST(0 AS DOUBLE)) WHEN (1 = 1) THEN CAST(1 AS DOUBLE) ELSE (CAST(2 AS DOUBLE) / CAST(0 AS DOUBLE)) END:double>
-- !query 19 output
1.0
-- !query 20
-SELECT CASE 1 WHEN 0 THEN 1/0 WHEN 1 THEN 1 ELSE 2/0 END
+SELECT CASE 1 WHEN 0 THEN 1/udf(0) WHEN 1 THEN 1 ELSE 2/0 END
-- !query 20 schema
-struct<CASE WHEN (1 = 0) THEN (CAST(1 AS DOUBLE) / CAST(0 AS DOUBLE)) WHEN (1 = 1) THEN CAST(1 AS DOUBLE) ELSE (CAST(2 AS DOUBLE) / CAST(0 AS DOUBLE)) END:double>
+struct<CASE WHEN (1 = 0) THEN (CAST(1 AS DOUBLE) / CAST(CAST(udf(0) AS DOUBLE) AS DOUBLE)) WHEN (1 = 1) THEN CAST(1 AS DOUBLE) ELSE (CAST(2 AS DOUBLE) / CAST(0 AS DOUBLE)) END:double>
-- !query 20 output
1.0
-- !query 21
-SELECT CASE WHEN i > 100 THEN 1/0 ELSE 0 END FROM case_tbl
+SELECT CASE WHEN i > 100 THEN udf(1/0) ELSE udf(0) END FROM case_tbl
-- !query 21 schema
-struct<CASE WHEN (i > 100) THEN (CAST(1 AS DOUBLE) / CAST(0 AS DOUBLE)) ELSE CAST(0 AS DOUBLE) END:double>
+struct<CASE WHEN (i > 100) THEN udf((cast(1 as double) / cast(0 as double))) ELSE udf(0) END:string>
-- !query 21 output
-0.0
-0.0
-0.0
-0.0
+0
+0
+0
+0
-- !query 22
-SELECT CASE 'a' WHEN 'a' THEN 1 ELSE 2 END
+SELECT CASE 'a' WHEN 'a' THEN udf(1) ELSE udf(2) END
-- !query 22 schema
-struct<CASE WHEN (a = a) THEN 1 ELSE 2 END:int>
+struct<CASE WHEN (a = a) THEN udf(1) ELSE udf(2) END:string>
-- !query 22 output
1
-283,7 +283,7 big
-- !query 27
-SELECT * FROM CASE_TBL WHERE COALESCE(f,i) = 4
+SELECT * FROM CASE_TBL WHERE udf(COALESCE(f,i)) = 4
-- !query 27 schema
struct<i:int,f:double>
-- !query 27 output
-291,7 +291,7 struct<i:int,f:double>
-- !query 28
-SELECT * FROM CASE_TBL WHERE NULLIF(f,i) = 2
+SELECT * FROM CASE_TBL WHERE udf(NULLIF(f,i)) = 2
-- !query 28 schema
struct<i:int,f:double>
-- !query 28 output
-299,10 +299,10 struct<i:int,f:double>
-- !query 29
-SELECT COALESCE(a.f, b.i, b.j)
+SELECT udf(COALESCE(a.f, b.i, b.j))
FROM CASE_TBL a, CASE2_TBL b
-- !query 29 schema
-struct<coalesce(f, CAST(i AS DOUBLE), CAST(j AS DOUBLE)):double>
+struct<udf(coalesce(f, cast(i as double), cast(j as double))):string>
-- !query 29 output
-30.3
-30.3
-332,8 +332,8 struct<coalesce(f, CAST(i AS DOUBLE), CAST(j AS DOUBLE)):double>
-- !query 30
SELECT *
- FROM CASE_TBL a, CASE2_TBL b
- WHERE COALESCE(a.f, b.i, b.j) = 2
+ FROM CASE_TBL a, CASE2_TBL b
+ WHERE udf(COALESCE(a.f, b.i, b.j)) = 2
-- !query 30 schema
struct<i:int,f:double,i:int,j:int>
-- !query 30 output
-342,7 +342,7 struct<i:int,f:double,i:int,j:int>
-- !query 31
-SELECT '' AS Five, NULLIF(a.i,b.i) AS `NULLIF(a.i,b.i)`,
+SELECT udf('') AS Five, NULLIF(a.i,b.i) AS `NULLIF(a.i,b.i)`,
NULLIF(b.i, 4) AS `NULLIF(b.i,4)`
FROM CASE_TBL a, CASE2_TBL b
-- !query 31 schema
-377,7 +377,7 struct<Five:string,NULLIF(a.i,b.i):int,NULLIF(b.i,4):int>
-- !query 32
SELECT '' AS `Two`, *
FROM CASE_TBL a, CASE2_TBL b
- WHERE COALESCE(f,b.i) = 2
+ WHERE CAST(udf(COALESCE(f,b.i) = 2) AS boolean)
-- !query 32 schema
struct<Two:string,i:int,f:double,i:int,j:int>
-- !query 32 output
-388,15 +388,15 struct<Two:string,i:int,f:double,i:int,j:int>
-- !query 33
SELECT CASE
(CASE vol('bar')
- WHEN 'foo' THEN 'it was foo!'
- WHEN vol(null) THEN 'null input'
+ WHEN udf('foo') THEN 'it was foo!'
+ WHEN udf(vol(null)) THEN 'null input'
WHEN 'bar' THEN 'it was bar!' END
)
- WHEN 'it was foo!' THEN 'foo recognized'
- WHEN 'it was bar!' THEN 'bar recognized'
- ELSE 'unrecognized' END
+ WHEN udf('it was foo!') THEN 'foo recognized'
+ WHEN 'it was bar!' THEN udf('bar recognized')
+ ELSE 'unrecognized' END AS col
-- !query 33 schema
-struct<CASE WHEN (CASE WHEN (UDF:vol(bar) = foo) THEN it was foo! WHEN (UDF:vol(bar) = UDF:vol(null)) THEN null input WHEN (UDF:vol(bar) = bar) THEN it was bar! END = it was foo!) THEN foo recognized WHEN (CASE WHEN (UDF:vol(bar) = foo) THEN it was foo! WHEN (UDF:vol(bar) = UDF:vol(null)) THEN null input WHEN (UDF:vol(bar) = bar) THEN it was bar! END = it was bar!) THEN bar recognized ELSE unrecognized END:string>
+struct<col:string>
-- !query 33 output
bar recognized
```
</p>
</details>
https://github.com/apache/spark/pull/25069 contains the same minor fixes as it's required to write the tests.
## How was this patch tested?
Tested as guided in [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921).
Closes#25070 from HyukjinKwon/SPARK-28273.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This PR proposes to rename `mapPartitionsInPandas` to `mapInPandas` with a separate evaluation type .
Had an offline discussion with rxin, mengxr and cloud-fan
The reason is basically:
1. `SCALAR_ITER` doesn't make sense with `mapPartitionsInPandas`.
2. It cannot share the same Pandas UDF, for instance, at `select` and `mapPartitionsInPandas` unlike `GROUPED_AGG` because iterator's return type is different.
3. `mapPartitionsInPandas` -> `mapInPandas` - see https://github.com/apache/spark/pull/25044#issuecomment-508298552 and https://github.com/apache/spark/pull/25044#issuecomment-508299764
Renaming `SCALAR_ITER` as `MAP_ITER` is abandoned due to 2. reason.
For `XXX_ITER`, it might have to have a different interface in the future if we happen to add other versions of them. But this is an orthogonal topic with `mapPartitionsInPandas`.
## How was this patch tested?
Existing tests should cover.
Closes#25044 from HyukjinKwon/SPARK-28198.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
In Python 2.7 with latest PyArrow and Pandas, the error message seems a bit different with Python 3. This PR simply fixes the test.
```
======================================================================
FAIL: test_createDataFrame_with_incorrect_schema (pyspark.sql.tests.test_arrow.ArrowTests)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/.../spark/python/pyspark/sql/tests/test_arrow.py", line 275, in test_createDataFrame_with_incorrect_schema
self.spark.createDataFrame(pdf, schema=wrong_schema)
AssertionError: "integer.*required.*got.*str" does not match "('Exception thrown when converting pandas.Series (object) to Arrow Array (int32). It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled by using SQL config `spark.sql.execution.pandas.arrowSafeTypeConversion`.', ArrowTypeError('an integer is required',))"
======================================================================
FAIL: test_createDataFrame_with_incorrect_schema (pyspark.sql.tests.test_arrow.EncryptionArrowTests)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/.../spark/python/pyspark/sql/tests/test_arrow.py", line 275, in test_createDataFrame_with_incorrect_schema
self.spark.createDataFrame(pdf, schema=wrong_schema)
AssertionError: "integer.*required.*got.*str" does not match "('Exception thrown when converting pandas.Series (object) to Arrow Array (int32). It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled by using SQL config `spark.sql.execution.pandas.arrowSafeTypeConversion`.', ArrowTypeError('an integer is required',))"
```
## How was this patch tested?
Manually tested.
```
cd python
./run-tests --python-executables=python --modules pyspark-sql
```
Closes#25042 from HyukjinKwon/SPARK-28240.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This PR proposes to add `mapPartitionsInPandas` API to DataFrame by using existing `SCALAR_ITER` as below:
1. Filtering via setting the column
```python
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
pandas_udf(df.schema, PandasUDFType.SCALAR_ITER)
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapPartitionsInPandas(filter_func).show()
```
```
+---+---+
| id|age|
+---+---+
| 1| 21|
+---+---+
```
2. `DataFrame.loc`
```python
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd
df = spark.createDataFrame([['aa'], ['bb'], ['cc'], ['aa'], ['aa'], ['aa']], ["value"])
pandas_udf(df.schema, PandasUDFType.SCALAR_ITER)
def filter_func(iterator):
for pdf in iterator:
yield pdf.loc[pdf.value.str.contains('^a'), :]
df.mapPartitionsInPandas(filter_func).show()
```
```
+-----+
|value|
+-----+
| aa|
| aa|
| aa|
| aa|
+-----+
```
3. `pandas.melt`
```python
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd
df = spark.createDataFrame(
pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
'B': {0: 1, 1: 3, 2: 5},
'C': {0: 2, 1: 4, 2: 6}}))
pandas_udf("A string, variable string, value long", PandasUDFType.SCALAR_ITER)
def filter_func(iterator):
for pdf in iterator:
import pandas as pd
yield pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C'])
df.mapPartitionsInPandas(filter_func).show()
```
```
+---+--------+-----+
| A|variable|value|
+---+--------+-----+
| a| B| 1|
| a| C| 2|
| b| B| 3|
| b| C| 4|
| c| B| 5|
| c| C| 6|
+---+--------+-----+
```
The current limitation of `SCALAR_ITER` is that it doesn't allow different length of result, which is pretty critical in practice - for instance, we cannot simply filter by using Pandas APIs but we merely just map N to N. This PR allows map N to M like flatMap.
This API mimics the way of `mapPartitions` but keeps API shape of `SCALAR_ITER` by allowing different results.
### How does this PR implement?
This PR adds mimics both `dapply` with Arrow optimization and Grouped Map Pandas UDF. At Python execution side, it reuses existing `SCALAR_ITER` code path.
Therefore, externally, we don't introduce any new type of Pandas UDF but internally we use another evaluation type code `205` (`SQL_MAP_PANDAS_ITER_UDF`).
This approach is similar with Pandas' Windows function implementation with Grouped Aggregation Pandas UDF functions - internally we have `203` (`SQL_WINDOW_AGG_PANDAS_UDF`) but externally we just share the same `GROUPED_AGG`.
## How was this patch tested?
Manually tested and unittests were added.
Closes#24997 from HyukjinKwon/scalar-udf-iter.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
The documentation in `linalg.py` is not consistent. This PR uniforms the documentation.
## How was this patch tested?
NA
Closes#25011 from mgaido91/SPARK-28170.
Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Add docstring/doctest for `SCALAR_ITER` Pandas UDF. I explicitly mentioned that per-partition execution is an implementation detail, not guaranteed. I will submit another PR to add the same to user guide, just to keep this PR minimal.
I didn't add "doctest: +SKIP" in the first commit so it is easy to test locally.
cc: HyukjinKwon gatorsmile icexelloss BryanCutler WeichenXu123
![Screen Shot 2019-06-28 at 9 52 41 AM](https://user-images.githubusercontent.com/829644/60358349-b0aa5400-998a-11e9-9ebf-8481dfd555b5.png)
![Screen Shot 2019-06-28 at 9 53 19 AM](https://user-images.githubusercontent.com/829644/60358355-b1db8100-998a-11e9-8f6f-00a11bdbdc4d.png)
## How was this patch tested?
doctest
Closes#25005 from mengxr/SPARK-28056.2.
Authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
Closes the generator when Python UDFs stop early.
### Manually verification on pandas iterator UDF and mapPartitions
```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import col, udf
from pyspark.taskcontext import TaskContext
import time
import os
spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1')
spark.conf.set('spark.sql.pandas.udf.buffer.size', '4')
pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi1(it):
try:
for batch in it:
yield batch + 100
time.sleep(1.0)
except BaseException as be:
print("Debug: exception raised: " + str(type(be)))
raise be
finally:
open("/tmp/000001.tmp", "a").close()
df1 = spark.range(10).select(col('id').alias('a')).repartition(1)
# will see log Debug: exception raised: <class 'GeneratorExit'>
# and file "/tmp/000001.tmp" generated.
df1.select(col('a'), fi1('a')).limit(2).collect()
def mapper(it):
try:
for batch in it:
yield batch
except BaseException as be:
print("Debug: exception raised: " + str(type(be)))
raise be
finally:
open("/tmp/000002.tmp", "a").close()
df2 = spark.range(10000000).repartition(1)
# will see log Debug: exception raised: <class 'GeneratorExit'>
# and file "/tmp/000002.tmp" generated.
df2.rdd.mapPartitions(mapper).take(2)
```
## How was this patch tested?
Unit test added.
Please review https://spark.apache.org/contributing.html before opening a pull request.
Closes#24986 from WeichenXu123/pandas_iter_udf_limit.
Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Currently with `toLocalIterator()` and `toPandas()` with Arrow enabled, if the Spark job being run in the background serving thread errors, it will be caught and sent to Python through the PySpark serializer.
This is not the ideal solution because it is only catch a SparkException, it won't handle an error that occurs in the serializer, and each method has to have it's own special handling to propagate the error.
This PR instead returns the Python Server object along with the serving port and authentication info, so that it allows the Python caller to join with the serving thread. During the call to join, the serving thread Future is completed either successfully or with an exception. In the latter case, the exception will be propagated to Python through the Py4j call.
## How was this patch tested?
Existing tests
Closes#24834 from BryanCutler/pyspark-propagate-server-error-SPARK-27992.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
## What changes were proposed in this pull request?
add missing RankingEvaluator
## How was this patch tested?
added testsuites
Closes#24869 from zhengruifeng/ranking_eval.
Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This patch removes `fillna(0)` when creating ArrowBatch from a pandas Series.
With `fillna(0)`, the original code would turn a timestamp type into object type, which pyarrow will complain later:
```
>>> s = pd.Series([pd.NaT, pd.Timestamp('2015-01-01')])
>>> s.dtypes
dtype('<M8[ns]')
>>> s.fillna(0)
0 0
1 2015-01-01 00:00:00
dtype: object
```
## How was this patch tested?
Added `test_timestamp_nat`
Closes#24844 from icexelloss/SPARK-28003-arrow-nat.
Authored-by: Li Jin <ice.xelloss@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
## What changes were proposed in this pull request?
Currently, pretty skipped message added by f7435bec6a mechanism seems not working when xmlrunner is installed apparently.
This PR fixes two things:
1. When `xmlrunner` is installed, seems `xmlrunner` does not respect `vervosity` level in unittests (default is level 1).
So the output looks as below
```
Running tests...
----------------------------------------------------------------------
SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS
----------------------------------------------------------------------
```
So it is not caught by our message detection mechanism.
2. If we manually set the `vervocity` level to `xmlrunner`, it prints messages as below:
```
test_mixed_udf (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... SKIP (0.000s)
test_mixed_udf_and_sql (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... SKIP (0.000s)
...
```
This is different in our Jenkins machine:
```
test_createDataFrame_column_name_encoding (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.23.2 must be installed; however, it was not found.'
test_createDataFrame_does_not_modify_input (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.23.2 must be installed; however, it was not found.'
...
```
Note that last `SKIP` is different. This PR fixes the regular expression to catch `SKIP` case as well.
## How was this patch tested?
Manually tested.
**Before:**
```
Starting test(python2.7): pyspark....
Finished test(python2.7): pyspark.... (0s)
...
Tests passed in 562 seconds
========================================================================
...
```
**After:**
```
Starting test(python2.7): pyspark....
Finished test(python2.7): pyspark.... (48s) ... 93 tests were skipped
...
Tests passed in 560 seconds
Skipped tests pyspark.... with python2.7:
pyspark...(...) ... SKIP (0.000s)
...
========================================================================
...
```
Closes#24927 from HyukjinKwon/SPARK-28130.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
When running FlatMapGroupsInPandasExec or AggregateInPandasExec the shuffle uses a default number of partitions of 200 in "spark.sql.shuffle.partitions". If the data is small, e.g. in testing, many of the partitions will be empty but are treated just the same.
This PR checks the `mapPartitionsInternal` iterator to be non-empty before calling `ArrowPythonRunner` to start computation on the iterator.
## How was this patch tested?
Existing tests. Ran the following benchmarks a simple example where most partitions are empty:
```python
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def normalize(pdf):
v = pdf.v
return pdf.assign(v=(v - v.mean()) / v.std())
df.groupby("id").apply(normalize).count()
```
**Before**
```
In [4]: %timeit df.groupby("id").apply(normalize).count()
1.58 s ± 62.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [5]: %timeit df.groupby("id").apply(normalize).count()
1.52 s ± 29.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [6]: %timeit df.groupby("id").apply(normalize).count()
1.52 s ± 37.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```
**After this Change**
```
In [2]: %timeit df.groupby("id").apply(normalize).count()
646 ms ± 89.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [3]: %timeit df.groupby("id").apply(normalize).count()
408 ms ± 84.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [4]: %timeit df.groupby("id").apply(normalize).count()
381 ms ± 29.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```
Closes#24926 from BryanCutler/pyspark-pandas_udf-map-agg-skip-empty-parts-SPARK-28128.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
expose more metrics in evaluator: weightedTruePositiveRate/weightedFalsePositiveRate/weightedFMeasure/truePositiveRateByLabel/falsePositiveRateByLabel/precisionByLabel/recallByLabel/fMeasureByLabel
## How was this patch tested?
existing cases and add cases
Closes#24868 from zhengruifeng/multi_class_support_bylabel.
Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
When using `DROPMALFORMED` mode, corrupted records aren't dropped if malformed columns aren't read. This behavior is due to CSV parser column pruning. Current doc of `DROPMALFORMED` doesn't mention the effect of column pruning. Users will be confused by the fact that `DROPMALFORMED` mode doesn't work as expected.
Column pruning also affects other modes. This is a doc improvement to add a note to doc of `mode` to explain it.
## How was this patch tested?
N/A. This is just doc change.
Closes#24894 from viirya/SPARK-28058.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This increases the minimum supported version of Pandas to 0.23.2. Using a lower version will raise an error `Pandas >= 0.23.2 must be installed; however, your version was 0.XX`. Also, a workaround for using pyarrow with Pandas 0.19.2 was removed.
## How was this patch tested?
Existing Tests
Closes#24867 from BryanCutler/pyspark-increase-min-pandas-SPARK-28041.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Allow Pandas UDF to take an iterator of pd.Series or an iterator of tuple of pd.Series.
Note the UDF input args will be always one iterator:
* if the udf take only column as input, the iterator's element will be pd.Series (corresponding to the column values batch)
* if the udf take multiple columns as inputs, the iterator's element will be a tuple composed of multiple `pd.Series`s, each one corresponding to the multiple columns as inputs (keep the same order). For example:
```
pandas_udf("int", PandasUDFType.SCALAR_ITER)
def the_udf(iterator):
for col1_batch, col2_batch in iterator:
yield col1_batch + col2_batch
df.select(the_udf("col1", "col2"))
```
The udf above will add col1 and col2.
I haven't add unit tests, but manually tests show it works fine. So it is ready for first pass review.
We can test several typical cases:
```
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import udf
from pyspark.taskcontext import TaskContext
df = spark.createDataFrame([(1, 20), (3, 40)], ["a", "b"])
pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi1(it):
pid = TaskContext.get().partitionId()
print("DBG: fi1: do init stuff, partitionId=" + str(pid))
for batch in it:
yield batch + 100
print("DBG: fi1: do close stuff, partitionId=" + str(pid))
pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi2(it):
pid = TaskContext.get().partitionId()
print("DBG: fi2: do init stuff, partitionId=" + str(pid))
for batch in it:
yield batch + 10000
print("DBG: fi2: do close stuff, partitionId=" + str(pid))
pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi3(it):
pid = TaskContext.get().partitionId()
print("DBG: fi3: do init stuff, partitionId=" + str(pid))
for x, y in it:
yield x + y * 10 + 100000
print("DBG: fi3: do close stuff, partitionId=" + str(pid))
pandas_udf("int", PandasUDFType.SCALAR)
def fp1(x):
return x + 1000
udf("int")
def fu1(x):
return x + 10
# test select "pandas iter udf/pandas udf/sql udf" expressions at the same time.
# Note this case the `fi1("a"), fi2("b"), fi3("a", "b")` will generate only one plan,
# and `fu1("a")`, `fp1("a")` will generate another two separate plans.
df.select(fi1("a"), fi2("b"), fi3("a", "b"), fu1("a"), fp1("a")).show()
# test chain two pandas iter udf together
# Note this case `fi2(fi1("a"))` will generate only one plan
# Also note the init stuff/close stuff call order will be like:
# (debug output following)
# DBG: fi2: do init stuff, partitionId=0
# DBG: fi1: do init stuff, partitionId=0
# DBG: fi1: do close stuff, partitionId=0
# DBG: fi2: do close stuff, partitionId=0
df.select(fi2(fi1("a"))).show()
# test more complex chain
# Note this case `fi1("a"), fi2("a")` will generate one plan,
# and `fi3(fi1_output, fi2_output)` will generate another plan
df.select(fi3(fi1("a"), fi2("a"))).show()
```
## How was this patch tested?
To be added.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#24643 from WeichenXu123/pandas_udf_iter.
Lead-authored-by: WeichenXu <weichen.xu@databricks.com>
Co-authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
## What changes were proposed in this pull request?
I suspect that the doc of `rowsBetween` methods in Scala and PySpark looks wrong.
Because:
```scala
scala> val df = Seq((1, "a"), (2, "a"), (3, "a"), (4, "a"), (5, "a"), (6, "a")).toDF("id", "category")
df: org.apache.spark.sql.DataFrame = [id: int, category: string]
scala> val byCategoryOrderedById = Window.partitionBy('category).orderBy('id).rowsBetween(-1, 2)
byCategoryOrderedById: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec7f04de97
scala> df.withColumn("sum", sum('id) over byCategoryOrderedById).show()
+---+--------+---+
| id|category|sum|
+---+--------+---+
| 1| a| 6| # sum from index 0 to (0 + 2): 1 + 2 + 3 = 6
| 2| a| 10| # sum from index (1 - 1) to (1 + 2): 1 + 2 + 3 + 4 = 10
| 3| a| 14|
| 4| a| 18|
| 5| a| 15|
| 6| a| 11|
+---+--------+---+
```
So the frame (-1, 2) for row with index 5, as described in the doc, should range from index 4 to index 7.
## How was this patch tested?
N/A, just doc change.
Closes#24864 from viirya/window-spec-doc.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
add MultilabelClassificationEvaluator
## How was this patch tested?
added testsuites
Closes#24777 from zhengruifeng/multi_label_eval.
Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Just found the doctest on `over` function of `Column` is commented out. The window spec is also not for the window function used there.
We should either remove the doctest, or improve it.
Because other functions of `Column` have doctest generally, so this PR tries to improve it.
## How was this patch tested?
Added doctest.
Closes#24854 from viirya/column-test-minor.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>