## What changes were proposed in this pull request?
This PR proposes to disallow default value None when 'to_replace' is not a dictionary.
It seems weird we set the default value of `value` to `None` and we ended up allowing the case as below:
```python
>>> df.show()
```
```
+----+------+-----+
| age|height| name|
+----+------+-----+
| 10| 80|Alice|
...
```
```python
>>> df.na.replace('Alice').show()
```
```
+----+------+----+
| age|height|name|
+----+------+----+
| 10| 80|null|
...
```
**After**
This PR targets to disallow the case above:
```python
>>> df.na.replace('Alice').show()
```
```
...
TypeError: value is required when to_replace is not a dictionary.
```
while we still allow when `to_replace` is a dictionary:
```python
>>> df.na.replace({'Alice': None}).show()
```
```
+----+------+----+
| age|height|name|
+----+------+----+
| 10| 80|null|
...
```
## How was this patch tested?
Manually tested, tests were added in `python/pyspark/sql/tests.py` and doctests were fixed.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#20499 from HyukjinKwon/SPARK-19454-followup.
## What changes were proposed in this pull request?
This is a followup pr of #20487.
When importing module but it doesn't exists, the error message is slightly different between Python 2 and 3.
E.g., in Python 2:
```
No module named pandas
```
in Python 3:
```
No module named 'pandas'
```
So, one test to check an import error fails in Python 3 without pandas.
This pr fixes it.
## How was this patch tested?
Tested manually in my local environment.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#20538 from ueshin/issues/SPARK-23319/fup1.
## What changes were proposed in this pull request?
This PR proposes to explicitly specify Pandas and PyArrow versions in PySpark tests to skip or test.
We declared the extra dependencies:
b8bfce51ab/python/setup.py (L204)
In case of PyArrow:
Currently we only check if pyarrow is installed or not without checking the version. It already fails to run tests. For example, if PyArrow 0.7.0 is installed:
```
======================================================================
ERROR: test_vectorized_udf_wrong_return_type (pyspark.sql.tests.ScalarPandasUDF)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/.../spark/python/pyspark/sql/tests.py", line 4019, in test_vectorized_udf_wrong_return_type
f = pandas_udf(lambda x: x * 1.0, MapType(LongType(), LongType()))
File "/.../spark/python/pyspark/sql/functions.py", line 2309, in pandas_udf
return _create_udf(f=f, returnType=return_type, evalType=eval_type)
File "/.../spark/python/pyspark/sql/udf.py", line 47, in _create_udf
require_minimum_pyarrow_version()
File "/.../spark/python/pyspark/sql/utils.py", line 132, in require_minimum_pyarrow_version
"however, your version was %s." % pyarrow.__version__)
ImportError: pyarrow >= 0.8.0 must be installed on calling Python process; however, your version was 0.7.0.
----------------------------------------------------------------------
Ran 33 tests in 8.098s
FAILED (errors=33)
```
In case of Pandas:
There are few tests for old Pandas which were tested only when Pandas version was lower, and I rewrote them to be tested when both Pandas version is lower and missing.
## How was this patch tested?
Manually tested by modifying the condition:
```
test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 1.19.2 must be installed; however, your version was 0.19.2.'
test_createDataFrame_does_not_modify_input (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 1.19.2 must be installed; however, your version was 0.19.2.'
test_createDataFrame_respect_session_timezone (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 1.19.2 must be installed; however, your version was 0.19.2.'
```
```
test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.'
test_createDataFrame_does_not_modify_input (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.'
test_createDataFrame_respect_session_timezone (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.'
```
```
test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 1.8.0 must be installed; however, your version was 0.8.0.'
test_createDataFrame_does_not_modify_input (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 1.8.0 must be installed; however, your version was 0.8.0.'
test_createDataFrame_respect_session_timezone (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 1.8.0 must be installed; however, your version was 0.8.0.'
```
```
test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 0.8.0 must be installed; however, it was not found.'
test_createDataFrame_does_not_modify_input (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 0.8.0 must be installed; however, it was not found.'
test_createDataFrame_respect_session_timezone (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 0.8.0 must be installed; however, it was not found.'
```
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#20487 from HyukjinKwon/pyarrow-pandas-skip.
## What changes were proposed in this pull request?
Replace `registerTempTable` by `createOrReplaceTempView`.
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20523 from gatorsmile/updateExamples.
## What changes were proposed in this pull request?
Update the description and tests of three external API or functions `createFunction `, `length` and `repartitionByRange `
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20495 from gatorsmile/updateFunc.
## What changes were proposed in this pull request?
In b2ce17b4c9, I mistakenly renamed `VectorizedUDFTests` to `ScalarPandasUDF`. This PR fixes the mistake.
## How was this patch tested?
Existing tests.
Author: Li Jin <ice.xelloss@gmail.com>
Closes#20489 from icexelloss/fix-scalar-udf-tests.
## What changes were proposed in this pull request?
In Python 2, when `pandas_udf` tries to return string type value created in the udf with `".."`, the execution fails. E.g.,
```python
from pyspark.sql.functions import pandas_udf, col
import pandas as pd
df = spark.range(10)
str_f = pandas_udf(lambda x: pd.Series(["%s" % i for i in x]), "string")
df.select(str_f(col('id'))).show()
```
raises the following exception:
```
...
java.lang.AssertionError: assertion failed: Invalid schema from pandas_udf: expected StringType, got BinaryType
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:93)
...
```
Seems like pyarrow ignores `type` parameter for `pa.Array.from_pandas()` and consider it as binary type when the type is string type and the string values are `str` instead of `unicode` in Python 2.
This pr adds a workaround for the case.
## How was this patch tested?
Added a test and existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#20507 from ueshin/issues/SPARK-23334.
## What changes were proposed in this pull request?
This is a follow-up pr of #19872 which uses `assertRaisesRegex` but it doesn't exist in Python 2, so some tests fail when running tests in Python 2 environment.
Unfortunately, we missed it because currently Python 2 environment of the pr builder doesn't have proper versions of pandas or pyarrow, so the tests were skipped.
This pr modifies to use `assertRaisesRegexp` instead of `assertRaisesRegex`.
## How was this patch tested?
Tested manually in my local environment.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#20467 from ueshin/issues/SPARK-22274/fup1.
## What changes were proposed in this pull request?
In the current PySpark code, Python created `jsparkSession` doesn't add to JVM's defaultSession, this `SparkSession` object cannot be fetched from Java side, so the below scala code will be failed when loaded in PySpark application.
```scala
class TestSparkSession extends SparkListener with Logging {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case CreateTableEvent(db, table) =>
val session = SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession)
assert(session.isDefined)
val tableInfo = session.get.sharedState.externalCatalog.getTable(db, table)
logInfo(s"Table info ${tableInfo}")
case e =>
logInfo(s"event $e")
}
}
}
```
So here propose to add fresh create `jsparkSession` to `defaultSession`.
## How was this patch tested?
Manual verification.
Author: jerryshao <sshao@hortonworks.com>
Author: hyukjinkwon <gurwls223@gmail.com>
Author: Saisai Shao <sai.sai.shao@gmail.com>
Closes#20404 from jerryshao/SPARK-23228.
## What changes were proposed in this pull request?
Rename the public APIs and names of pandas udfs.
- `PANDAS SCALAR UDF` -> `SCALAR PANDAS UDF`
- `PANDAS GROUP MAP UDF` -> `GROUPED MAP PANDAS UDF`
- `PANDAS GROUP AGG UDF` -> `GROUPED AGG PANDAS UDF`
## How was this patch tested?
The existing tests
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20428 from gatorsmile/renamePandasUDFs.
## What changes were proposed in this pull request?
It's not obvious from the comments that any added column must be a
function of the dataset that we are adding it to. Add a comment to
that effect to Scala, Python and R Data* methods.
Author: Henry Robinson <henry@cloudera.com>
Closes#20429 from henryr/SPARK-23157.
## What changes were proposed in this pull request?
Reproducer:
```python
from pyspark.sql.functions import udf
f = udf(lambda x: x)
spark.range(1).select(f("id")) # cache JVM UDF instance.
f = f.asNondeterministic()
spark.range(1).select(f("id"))._jdf.logicalPlan().projectList().head().deterministic()
```
It should return `False` but the current master returns `True`. Seems it's because we cache the JVM UDF instance and then we reuse it even after setting `deterministic` disabled once it's called.
## How was this patch tested?
Manually tested. I am not sure if I should add the test with a lot of JVM accesses with the intetnal stuff .. Let me know if anyone feels so. I will add.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#20409 from HyukjinKwon/SPARK-23233.
## What changes were proposed in this pull request?
Add colRegex API to PySpark
## How was this patch tested?
add a test in sql/tests.py
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#20390 from huaxingao/spark-23081.
## What changes were proposed in this pull request?
We extract Python UDFs in logical aggregate which depends on aggregate expression or grouping key in ExtractPythonUDFFromAggregate rule. But Python UDFs which don't depend on above expressions should also be extracted to avoid the issue reported in the JIRA.
A small code snippet to reproduce that issue looks like:
```python
import pyspark.sql.functions as f
df = spark.createDataFrame([(1,2), (3,4)])
f_udf = f.udf(lambda: str("const_str"))
df2 = df.distinct().withColumn("a", f_udf())
df2.show()
```
Error exception is raised as:
```
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF0#50
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:91)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:90)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:90)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:514)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:513)
```
This exception raises because `HashAggregateExec` tries to bind the aliased Python UDF expression (e.g., `pythonUDF0#50 AS a#44`) to grouping key.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#20360 from viirya/SPARK-23177.
## What changes were proposed in this pull request?
Add support for using pandas UDFs with groupby().agg().
This PR introduces a new type of pandas UDF - group aggregate pandas UDF. This type of UDF defines a transformation of multiple pandas Series -> a scalar value. Group aggregate pandas UDFs can be used with groupby().agg(). Note group aggregate pandas UDF doesn't support partial aggregation, i.e., a full shuffle is required.
This PR doesn't support group aggregate pandas UDFs that return ArrayType, StructType or MapType. Support for these types is left for future PR.
## How was this patch tested?
GroupbyAggPandasUDFTests
Author: Li Jin <ice.xelloss@gmail.com>
Closes#19872 from icexelloss/SPARK-22274-groupby-agg.
## What changes were proposed in this pull request?
This PR is to update the docs for UDF registration
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20348 from gatorsmile/testUpdateDoc.
## What changes were proposed in this pull request?
This is a follow-up of #20246.
If a UDT in Python doesn't have its corresponding Scala UDT, cast to string will be the raw string of the internal value, e.g. `"org.apache.spark.sql.catalyst.expressions.UnsafeArrayDataxxxxxxxx"` if the internal type is `ArrayType`.
This pr fixes it by using its `sqlType` casting.
## How was this patch tested?
Added a test and existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#20306 from ueshin/issues/SPARK-23054/fup1.
## What changes were proposed in this pull request?
Self-explanatory.
## How was this patch tested?
New python tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#20309 from tdas/SPARK-23143.
## What changes were proposed in this pull request?
Currently `UDFRegistration.registerJavaFunction` doesn't support data type string as a `returnType` whereas `UDFRegistration.register`, `udf`, or `pandas_udf` does.
We can support it for `UDFRegistration.registerJavaFunction` as well.
## How was this patch tested?
Added a doctest and existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#20307 from ueshin/issues/SPARK-23141.
## What changes were proposed in this pull request?
This PR proposes to deprecate `register*` for UDFs in `SQLContext` and `Catalog` in Spark 2.3.0.
These are inconsistent with Scala / Java APIs and also these basically do the same things with `spark.udf.register*`.
Also, this PR moves the logcis from `[sqlContext|spark.catalog].register*` to `spark.udf.register*` and reuse the docstring.
This PR also handles minor doc corrections. It also includes https://github.com/apache/spark/pull/20158
## How was this patch tested?
Manually tested, manually checked the API documentation and tests added to check if deprecated APIs call the aliases correctly.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#20288 from HyukjinKwon/deprecate-udf.
## What changes were proposed in this pull request?
Make the default behavior of EXCEPT (i.e. EXCEPT DISTINCT) more
explicit in the documentation, and call out the change in behavior
from 1.x.
Author: Henry Robinson <henry@cloudera.com>
Closes#20254 from henryr/spark-23062.
## What changes were proposed in this pull request?
Register Vectorized UDFs for SQL Statement. For example,
```Python
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> pandas_udf("integer", PandasUDFType.SCALAR)
... def add_one(x):
... return x + 1
...
>>> _ = spark.udf.register("add_one", add_one)
>>> spark.sql("SELECT add_one(id) FROM range(3)").collect()
[Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
```
## How was this patch tested?
Added test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20171 from gatorsmile/supportVectorizedUDF.
## What changes were proposed in this pull request?
The current `Datset.showString` prints rows thru `RowEncoder` deserializers like;
```
scala> Seq(Seq(Seq(1, 2), Seq(3), Seq(4, 5, 6))).toDF("a").show(false)
+------------------------------------------------------------+
|a |
+------------------------------------------------------------+
|[WrappedArray(1, 2), WrappedArray(3), WrappedArray(4, 5, 6)]|
+------------------------------------------------------------+
```
This result is incorrect because the correct one is;
```
scala> Seq(Seq(Seq(1, 2), Seq(3), Seq(4, 5, 6))).toDF("a").show(false)
+------------------------+
|a |
+------------------------+
|[[1, 2], [3], [4, 5, 6]]|
+------------------------+
```
So, this pr fixed code in `showString` to cast field data to strings before printing.
## How was this patch tested?
Added tests in `DataFrameSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#20214 from maropu/SPARK-23023.
## What changes were proposed in this pull request?
This PR proposes to add a note that saying the length of a scalar Pandas UDF's `Series` is not of the whole input column but of the batch.
We are fine for a group map UDF because the usage is different from our typical UDF but scalar UDFs might cause confusion with the normal UDF.
For example, please consider this example:
```python
from pyspark.sql.functions import pandas_udf, col, lit
df = spark.range(1)
f = pandas_udf(lambda x, y: len(x) + y, LongType())
df.select(f(lit('text'), col('id'))).show()
```
```
+------------------+
|<lambda>(text, id)|
+------------------+
| 1|
+------------------+
```
```python
from pyspark.sql.functions import udf, col, lit
df = spark.range(1)
f = udf(lambda x, y: len(x) + y, "long")
df.select(f(lit('text'), col('id'))).show()
```
```
+------------------+
|<lambda>(text, id)|
+------------------+
| 4|
+------------------+
```
## How was this patch tested?
Manually built the doc and checked the output.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#20237 from HyukjinKwon/SPARK-22980.
## What changes were proposed in this pull request?
This the case when calling `SparkSession.createDataFrame` using a Pandas DataFrame that has non-str column labels.
The column name conversion logic to handle non-string or unicode in python2 is:
```
if column is not any type of string:
name = str(column)
else if column is unicode in Python 2:
name = column.encode('utf-8')
```
## How was this patch tested?
Added a new test with a Pandas DataFrame that has int column labels
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#20210 from BryanCutler/python-createDataFrame-int-col-error-SPARK-23009.
## What changes were proposed in this pull request?
This fixes createDataFrame from Pandas to only assign modified timestamp series back to a copied version of the Pandas DataFrame. Previously, if the Pandas DataFrame was only a reference (e.g. a slice of another) each series will still get assigned back to the reference even if it is not a modified timestamp column. This caused the following warning "SettingWithCopyWarning: A value is trying to be set on a copy of a slice from a DataFrame."
## How was this patch tested?
existing tests
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#20213 from BryanCutler/pyspark-createDataFrame-copy-slice-warn-SPARK-23018.
## What changes were proposed in this pull request?
It provides a better error message when doing `spark_session.createDataFrame(pandas_df)` with no schema and an error occurs in the schema inference due to incompatible types.
The Pandas column names are propagated down and the error message mentions which column had the merging error.
https://issues.apache.org/jira/browse/SPARK-22566
## How was this patch tested?
Manually in the `./bin/pyspark` console, and with new tests: `./python/run-tests`
<img width="873" alt="screen shot 2017-11-21 at 13 29 49" src="https://user-images.githubusercontent.com/3977115/33080121-382274e0-cecf-11e7-808f-057a65bb7b00.png">
I state that the contribution is my original work and that I license the work to the Apache Spark project under the project’s open source license.
Author: Guilherme Berger <gberger@palantir.com>
Closes#19792 from gberger/master.
## What changes were proposed in this pull request?
This PR wraps the `asNondeterministic` attribute in the wrapped UDF function to set the docstring properly.
```python
from pyspark.sql.functions import udf
help(udf(lambda x: x).asNondeterministic)
```
Before:
```
Help on function <lambda> in module pyspark.sql.udf:
<lambda> lambda
(END
```
After:
```
Help on function asNondeterministic in module pyspark.sql.udf:
asNondeterministic()
Updates UserDefinedFunction to nondeterministic.
.. versionadded:: 2.3
(END)
```
## How was this patch tested?
Manually tested and a simple test was added.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#20173 from HyukjinKwon/SPARK-22901-followup.
## What changes were proposed in this pull request?
Add tests for using non deterministic UDFs in aggregate.
Update pandas_udf docstring w.r.t to determinism.
## How was this patch tested?
test_nondeterministic_udf_in_aggregate
Author: Li Jin <ice.xelloss@gmail.com>
Closes#20142 from icexelloss/SPARK-22930-pandas-udf-deterministic.
## What changes were proposed in this pull request?
```Python
import random
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType
random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()
spark.catalog.registerFunction("random_udf", random_udf, StringType())
spark.sql("SELECT random_udf()").collect()
```
We will get the following error.
```
Py4JError: An error occurred while calling o29.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
```
This PR is to support it.
## How was this patch tested?
WIP
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20137 from gatorsmile/registerFunction.
## What changes were proposed in this pull request?
R Structured Streaming API for withWatermark, trigger, partitionBy
## How was this patch tested?
manual, unit tests
Author: Felix Cheung <felixcheung_m@hotmail.com>
Closes#20129 from felixcheung/rwater.
## What changes were proposed in this pull request?
This change adds `ArrayType` support for working with Arrow in pyspark when creating a DataFrame, calling `toPandas()`, and using vectorized `pandas_udf`.
## How was this patch tested?
Added new Python unit tests using Array data.
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#20114 from BryanCutler/arrow-ArrayType-support-SPARK-22530.
## What changes were proposed in this pull request?
This pr modified `concat` to concat binary inputs into a single binary output.
`concat` in the current master always output data as a string. But, in some databases (e.g., PostgreSQL), if all inputs are binary, `concat` also outputs binary.
## How was this patch tested?
Added tests in `SQLQueryTestSuite` and `TypeCoercionSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#19977 from maropu/SPARK-22771.
## What changes were proposed in this pull request?
This is a follow-up pr of #19587.
If `xmlrunner` is installed, `VectorizedUDFTests.test_vectorized_udf_check_config` fails by the following error because the `self` which is a subclass of `unittest.TestCase` in the UDF `check_records_per_batch` can't be pickled anymore.
```
PicklingError: Cannot pickle files that are not opened for reading: w
```
This changes the UDF not to refer the `self`.
## How was this patch tested?
Tested locally.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#20115 from ueshin/issues/SPARK-22370_fup1.
## What changes were proposed in this pull request?
Escape of escape should be considered when using the UniVocity csv encoding/decoding library.
Ref: https://github.com/uniVocity/univocity-parsers#escaping-quote-escape-characters
One option is added for reading and writing CSV: `escapeQuoteEscaping`
## How was this patch tested?
Unit test added.
Author: soonmok-kwon <soonmok.kwon@navercorp.com>
Closes#20004 from ep1804/SPARK-22818.
## What changes were proposed in this pull request?
In SPARK-20586 the flag `deterministic` was added to Scala UDF, but it is not available for python UDF. This flag is useful for cases when the UDF's code can return different result with the same input. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query. This can lead to unexpected behavior.
This PR adds the deterministic flag, via the `asNondeterministic` method, to let the user mark the function as non-deterministic and therefore avoid the optimizations which might lead to strange behaviors.
## How was this patch tested?
Manual tests:
```
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>> df_br = spark.createDataFrame([{'name': 'hello'}])
>>> import random
>>> udf_random_col = udf(lambda: int(100*random.random()), IntegerType()).asNondeterministic()
>>> df_br = df_br.withColumn('RAND', udf_random_col())
>>> random.seed(1234)
>>> udf_add_ten = udf(lambda rand: rand + 10, IntegerType())
>>> df_br.withColumn('RAND_PLUS_TEN', udf_add_ten('RAND')).show()
+-----+----+-------------+
| name|RAND|RAND_PLUS_TEN|
+-----+----+-------------+
|hello| 3| 13|
+-----+----+-------------+
```
Author: Marco Gaido <marcogaido91@gmail.com>
Author: Marco Gaido <mgaido@hortonworks.com>
Closes#19929 from mgaido91/SPARK-22629.
## What changes were proposed in this pull request?
Decimal type is not yet supported in `ArrowWriter`.
This is adding the decimal type support.
## How was this patch tested?
Added a test to `ArrowConvertersSuite`.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#18754 from ueshin/issues/SPARK-21552.
## What changes were proposed in this pull request?
This is a follow-up pr of #20054 modifying error messages for both pandas and pyarrow to show actual versions.
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#20074 from ueshin/issues/SPARK-22874_fup1.
## What changes were proposed in this pull request?
Currently we check pandas version by capturing if `ImportError` for the specific imports is raised or not but we can compare `LooseVersion` of the version strings as the same as we're checking pyarrow version.
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#20054 from ueshin/issues/SPARK-22874.
## What changes were proposed in this pull request?
Upgrade Spark to Arrow 0.8.0 for Java and Python. Also includes an upgrade of Netty to 4.1.17 to resolve dependency requirements.
The highlights that pertain to Spark for the update from Arrow versoin 0.4.1 to 0.8.0 include:
* Java refactoring for more simple API
* Java reduced heap usage and streamlined hot code paths
* Type support for DecimalType, ArrayType
* Improved type casting support in Python
* Simplified type checking in Python
## How was this patch tested?
Existing tests
Author: Bryan Cutler <cutlerb@gmail.com>
Author: Shixiong Zhu <zsxwing@gmail.com>
Closes#19884 from BryanCutler/arrow-upgrade-080-SPARK-22324.
## What changes were proposed in this pull request?
Like `Parquet`, users can use `ORC` with Apache Spark structured streaming. This PR adds `orc()` to `DataStreamReader`(Scala/Python) in order to support creating streaming dataset with ORC file format more easily like the other file formats. Also, this adds a test coverage for ORC data source and updates the document.
**BEFORE**
```scala
scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
<console>:24: error: value orc is not a member of org.apache.spark.sql.streaming.DataStreamReader
spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
```
**AFTER**
```scala
scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
res0: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper678b3746
scala>
-------------------------------------------
Batch: 0
-------------------------------------------
+---+
| a|
+---+
| 1|
+---+
```
## How was this patch tested?
Pass the newly added test cases.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19975 from dongjoon-hyun/SPARK-22781.
## What changes were proposed in this pull request?
This change adds local checkpoint support to datasets and respective bind from Python Dataframe API.
If reliability requirements can be lowered to favor performance, as in cases of further quick transformations followed by a reliable save, localCheckpoints() fit very well.
Furthermore, at the moment Reliable checkpoints still incur double computation (see #9428)
In general it makes the API more complete as well.
## How was this patch tested?
Python land quick use case:
```python
>>> from time import sleep
>>> from pyspark.sql import types as T
>>> from pyspark.sql import functions as F
>>> def f(x):
sleep(1)
return x*2
...:
>>> df1 = spark.range(30, numPartitions=6)
>>> df2 = df1.select(F.udf(f, T.LongType())("id"))
>>> %time _ = df2.collect()
CPU times: user 7.79 ms, sys: 5.84 ms, total: 13.6 ms
Wall time: 12.2 s
>>> %time df3 = df2.localCheckpoint()
CPU times: user 2.38 ms, sys: 2.3 ms, total: 4.68 ms
Wall time: 10.3 s
>>> %time _ = df3.collect()
CPU times: user 5.09 ms, sys: 410 µs, total: 5.5 ms
Wall time: 148 ms
>>> sc.setCheckpointDir(".")
>>> %time df3 = df2.checkpoint()
CPU times: user 4.04 ms, sys: 1.63 ms, total: 5.67 ms
Wall time: 20.3 s
```
Author: Fernando Pereira <fernando.pereira@epfl.ch>
Closes#19805 from ferdonline/feature_dataset_localCheckpoint.
## What changes were proposed in this pull request?
In multiple text analysis problems, it is not often desirable for the rows to be split by "\n". There exists a wholeText reader for RDD API, and this JIRA just adds the same support for Dataset API.
## How was this patch tested?
Added relevant new tests for both scala and Java APIs
Author: Prashant Sharma <prashsh1@in.ibm.com>
Author: Prashant Sharma <prashant@apache.org>
Closes#14151 from ScrapCodes/SPARK-16496/wholetext.
## What changes were proposed in this pull request?
When converting Pandas DataFrame/Series from/to Spark DataFrame using `toPandas()` or pandas udfs, timestamp values behave to respect Python system timezone instead of session timezone.
For example, let's say we use `"America/Los_Angeles"` as session timezone and have a timestamp value `"1970-01-01 00:00:01"` in the timezone. Btw, I'm in Japan so Python timezone would be `"Asia/Tokyo"`.
The timestamp value from current `toPandas()` will be the following:
```
>>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
>>> df = spark.createDataFrame([28801], "long").selectExpr("timestamp(value) as ts")
>>> df.show()
+-------------------+
| ts|
+-------------------+
|1970-01-01 00:00:01|
+-------------------+
>>> df.toPandas()
ts
0 1970-01-01 17:00:01
```
As you can see, the value becomes `"1970-01-01 17:00:01"` because it respects Python timezone.
As we discussed in #18664, we consider this behavior is a bug and the value should be `"1970-01-01 00:00:01"`.
## How was this patch tested?
Added tests and existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19607 from ueshin/issues/SPARK-22395.
## What changes were proposed in this pull request?
In PySpark API Document, DataFrame.write.csv() says that setting the quote parameter to an empty string should turn off quoting. Instead, it uses the [null character](https://en.wikipedia.org/wiki/Null_character) as the quote.
This PR fixes the doc.
## How was this patch tested?
Manual.
```
cd python/docs
make html
open _build/html/pyspark.sql.html
```
Author: gaborgsomogyi <gabor.g.somogyi@gmail.com>
Closes#19814 from gaborgsomogyi/SPARK-22484.
## What changes were proposed in this pull request?
Besides conditional expressions such as `when` and `if`, users may want to conditionally execute python udfs by short-curcuit evaluation. We should also explicitly note that python udfs don't support this kind of conditional execution too.
## How was this patch tested?
N/A, just document change.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19787 from viirya/SPARK-22541.