## What changes were proposed in this pull request?
This the case when calling `SparkSession.createDataFrame` using a Pandas DataFrame that has non-str column labels.
The column name conversion logic to handle non-string or unicode in python2 is:
```
if column is not any type of string:
name = str(column)
else if column is unicode in Python 2:
name = column.encode('utf-8')
```
## How was this patch tested?
Added a new test with a Pandas DataFrame that has int column labels
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#20210 from BryanCutler/python-createDataFrame-int-col-error-SPARK-23009.
## What changes were proposed in this pull request?
This fixes createDataFrame from Pandas to only assign modified timestamp series back to a copied version of the Pandas DataFrame. Previously, if the Pandas DataFrame was only a reference (e.g. a slice of another) each series will still get assigned back to the reference even if it is not a modified timestamp column. This caused the following warning "SettingWithCopyWarning: A value is trying to be set on a copy of a slice from a DataFrame."
## How was this patch tested?
existing tests
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#20213 from BryanCutler/pyspark-createDataFrame-copy-slice-warn-SPARK-23018.
## What changes were proposed in this pull request?
It provides a better error message when doing `spark_session.createDataFrame(pandas_df)` with no schema and an error occurs in the schema inference due to incompatible types.
The Pandas column names are propagated down and the error message mentions which column had the merging error.
https://issues.apache.org/jira/browse/SPARK-22566
## How was this patch tested?
Manually in the `./bin/pyspark` console, and with new tests: `./python/run-tests`
<img width="873" alt="screen shot 2017-11-21 at 13 29 49" src="https://user-images.githubusercontent.com/3977115/33080121-382274e0-cecf-11e7-808f-057a65bb7b00.png">
I state that the contribution is my original work and that I license the work to the Apache Spark project under the project’s open source license.
Author: Guilherme Berger <gberger@palantir.com>
Closes#19792 from gberger/master.
## What changes were proposed in this pull request?
This PR wraps the `asNondeterministic` attribute in the wrapped UDF function to set the docstring properly.
```python
from pyspark.sql.functions import udf
help(udf(lambda x: x).asNondeterministic)
```
Before:
```
Help on function <lambda> in module pyspark.sql.udf:
<lambda> lambda
(END
```
After:
```
Help on function asNondeterministic in module pyspark.sql.udf:
asNondeterministic()
Updates UserDefinedFunction to nondeterministic.
.. versionadded:: 2.3
(END)
```
## How was this patch tested?
Manually tested and a simple test was added.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#20173 from HyukjinKwon/SPARK-22901-followup.
## What changes were proposed in this pull request?
Add tests for using non deterministic UDFs in aggregate.
Update pandas_udf docstring w.r.t to determinism.
## How was this patch tested?
test_nondeterministic_udf_in_aggregate
Author: Li Jin <ice.xelloss@gmail.com>
Closes#20142 from icexelloss/SPARK-22930-pandas-udf-deterministic.
## What changes were proposed in this pull request?
```Python
import random
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType
random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()
spark.catalog.registerFunction("random_udf", random_udf, StringType())
spark.sql("SELECT random_udf()").collect()
```
We will get the following error.
```
Py4JError: An error occurred while calling o29.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
```
This PR is to support it.
## How was this patch tested?
WIP
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20137 from gatorsmile/registerFunction.
## What changes were proposed in this pull request?
R Structured Streaming API for withWatermark, trigger, partitionBy
## How was this patch tested?
manual, unit tests
Author: Felix Cheung <felixcheung_m@hotmail.com>
Closes#20129 from felixcheung/rwater.
## What changes were proposed in this pull request?
This change adds `ArrayType` support for working with Arrow in pyspark when creating a DataFrame, calling `toPandas()`, and using vectorized `pandas_udf`.
## How was this patch tested?
Added new Python unit tests using Array data.
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#20114 from BryanCutler/arrow-ArrayType-support-SPARK-22530.
## What changes were proposed in this pull request?
This pr modified `concat` to concat binary inputs into a single binary output.
`concat` in the current master always output data as a string. But, in some databases (e.g., PostgreSQL), if all inputs are binary, `concat` also outputs binary.
## How was this patch tested?
Added tests in `SQLQueryTestSuite` and `TypeCoercionSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#19977 from maropu/SPARK-22771.
## What changes were proposed in this pull request?
This is a follow-up pr of #19587.
If `xmlrunner` is installed, `VectorizedUDFTests.test_vectorized_udf_check_config` fails by the following error because the `self` which is a subclass of `unittest.TestCase` in the UDF `check_records_per_batch` can't be pickled anymore.
```
PicklingError: Cannot pickle files that are not opened for reading: w
```
This changes the UDF not to refer the `self`.
## How was this patch tested?
Tested locally.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#20115 from ueshin/issues/SPARK-22370_fup1.
## What changes were proposed in this pull request?
Escape of escape should be considered when using the UniVocity csv encoding/decoding library.
Ref: https://github.com/uniVocity/univocity-parsers#escaping-quote-escape-characters
One option is added for reading and writing CSV: `escapeQuoteEscaping`
## How was this patch tested?
Unit test added.
Author: soonmok-kwon <soonmok.kwon@navercorp.com>
Closes#20004 from ep1804/SPARK-22818.
## What changes were proposed in this pull request?
In SPARK-20586 the flag `deterministic` was added to Scala UDF, but it is not available for python UDF. This flag is useful for cases when the UDF's code can return different result with the same input. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query. This can lead to unexpected behavior.
This PR adds the deterministic flag, via the `asNondeterministic` method, to let the user mark the function as non-deterministic and therefore avoid the optimizations which might lead to strange behaviors.
## How was this patch tested?
Manual tests:
```
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>> df_br = spark.createDataFrame([{'name': 'hello'}])
>>> import random
>>> udf_random_col = udf(lambda: int(100*random.random()), IntegerType()).asNondeterministic()
>>> df_br = df_br.withColumn('RAND', udf_random_col())
>>> random.seed(1234)
>>> udf_add_ten = udf(lambda rand: rand + 10, IntegerType())
>>> df_br.withColumn('RAND_PLUS_TEN', udf_add_ten('RAND')).show()
+-----+----+-------------+
| name|RAND|RAND_PLUS_TEN|
+-----+----+-------------+
|hello| 3| 13|
+-----+----+-------------+
```
Author: Marco Gaido <marcogaido91@gmail.com>
Author: Marco Gaido <mgaido@hortonworks.com>
Closes#19929 from mgaido91/SPARK-22629.
## What changes were proposed in this pull request?
Decimal type is not yet supported in `ArrowWriter`.
This is adding the decimal type support.
## How was this patch tested?
Added a test to `ArrowConvertersSuite`.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#18754 from ueshin/issues/SPARK-21552.
## What changes were proposed in this pull request?
This is a follow-up pr of #20054 modifying error messages for both pandas and pyarrow to show actual versions.
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#20074 from ueshin/issues/SPARK-22874_fup1.
## What changes were proposed in this pull request?
Currently we check pandas version by capturing if `ImportError` for the specific imports is raised or not but we can compare `LooseVersion` of the version strings as the same as we're checking pyarrow version.
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#20054 from ueshin/issues/SPARK-22874.
## What changes were proposed in this pull request?
Upgrade Spark to Arrow 0.8.0 for Java and Python. Also includes an upgrade of Netty to 4.1.17 to resolve dependency requirements.
The highlights that pertain to Spark for the update from Arrow versoin 0.4.1 to 0.8.0 include:
* Java refactoring for more simple API
* Java reduced heap usage and streamlined hot code paths
* Type support for DecimalType, ArrayType
* Improved type casting support in Python
* Simplified type checking in Python
## How was this patch tested?
Existing tests
Author: Bryan Cutler <cutlerb@gmail.com>
Author: Shixiong Zhu <zsxwing@gmail.com>
Closes#19884 from BryanCutler/arrow-upgrade-080-SPARK-22324.
## What changes were proposed in this pull request?
Like `Parquet`, users can use `ORC` with Apache Spark structured streaming. This PR adds `orc()` to `DataStreamReader`(Scala/Python) in order to support creating streaming dataset with ORC file format more easily like the other file formats. Also, this adds a test coverage for ORC data source and updates the document.
**BEFORE**
```scala
scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
<console>:24: error: value orc is not a member of org.apache.spark.sql.streaming.DataStreamReader
spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
```
**AFTER**
```scala
scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
res0: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper678b3746
scala>
-------------------------------------------
Batch: 0
-------------------------------------------
+---+
| a|
+---+
| 1|
+---+
```
## How was this patch tested?
Pass the newly added test cases.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19975 from dongjoon-hyun/SPARK-22781.
## What changes were proposed in this pull request?
This change adds local checkpoint support to datasets and respective bind from Python Dataframe API.
If reliability requirements can be lowered to favor performance, as in cases of further quick transformations followed by a reliable save, localCheckpoints() fit very well.
Furthermore, at the moment Reliable checkpoints still incur double computation (see #9428)
In general it makes the API more complete as well.
## How was this patch tested?
Python land quick use case:
```python
>>> from time import sleep
>>> from pyspark.sql import types as T
>>> from pyspark.sql import functions as F
>>> def f(x):
sleep(1)
return x*2
...:
>>> df1 = spark.range(30, numPartitions=6)
>>> df2 = df1.select(F.udf(f, T.LongType())("id"))
>>> %time _ = df2.collect()
CPU times: user 7.79 ms, sys: 5.84 ms, total: 13.6 ms
Wall time: 12.2 s
>>> %time df3 = df2.localCheckpoint()
CPU times: user 2.38 ms, sys: 2.3 ms, total: 4.68 ms
Wall time: 10.3 s
>>> %time _ = df3.collect()
CPU times: user 5.09 ms, sys: 410 µs, total: 5.5 ms
Wall time: 148 ms
>>> sc.setCheckpointDir(".")
>>> %time df3 = df2.checkpoint()
CPU times: user 4.04 ms, sys: 1.63 ms, total: 5.67 ms
Wall time: 20.3 s
```
Author: Fernando Pereira <fernando.pereira@epfl.ch>
Closes#19805 from ferdonline/feature_dataset_localCheckpoint.
## What changes were proposed in this pull request?
In multiple text analysis problems, it is not often desirable for the rows to be split by "\n". There exists a wholeText reader for RDD API, and this JIRA just adds the same support for Dataset API.
## How was this patch tested?
Added relevant new tests for both scala and Java APIs
Author: Prashant Sharma <prashsh1@in.ibm.com>
Author: Prashant Sharma <prashant@apache.org>
Closes#14151 from ScrapCodes/SPARK-16496/wholetext.
## What changes were proposed in this pull request?
When converting Pandas DataFrame/Series from/to Spark DataFrame using `toPandas()` or pandas udfs, timestamp values behave to respect Python system timezone instead of session timezone.
For example, let's say we use `"America/Los_Angeles"` as session timezone and have a timestamp value `"1970-01-01 00:00:01"` in the timezone. Btw, I'm in Japan so Python timezone would be `"Asia/Tokyo"`.
The timestamp value from current `toPandas()` will be the following:
```
>>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
>>> df = spark.createDataFrame([28801], "long").selectExpr("timestamp(value) as ts")
>>> df.show()
+-------------------+
| ts|
+-------------------+
|1970-01-01 00:00:01|
+-------------------+
>>> df.toPandas()
ts
0 1970-01-01 17:00:01
```
As you can see, the value becomes `"1970-01-01 17:00:01"` because it respects Python timezone.
As we discussed in #18664, we consider this behavior is a bug and the value should be `"1970-01-01 00:00:01"`.
## How was this patch tested?
Added tests and existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19607 from ueshin/issues/SPARK-22395.
## What changes were proposed in this pull request?
In PySpark API Document, DataFrame.write.csv() says that setting the quote parameter to an empty string should turn off quoting. Instead, it uses the [null character](https://en.wikipedia.org/wiki/Null_character) as the quote.
This PR fixes the doc.
## How was this patch tested?
Manual.
```
cd python/docs
make html
open _build/html/pyspark.sql.html
```
Author: gaborgsomogyi <gabor.g.somogyi@gmail.com>
Closes#19814 from gaborgsomogyi/SPARK-22484.
## What changes were proposed in this pull request?
Besides conditional expressions such as `when` and `if`, users may want to conditionally execute python udfs by short-curcuit evaluation. We should also explicitly note that python udfs don't support this kind of conditional execution too.
## How was this patch tested?
N/A, just document change.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19787 from viirya/SPARK-22541.
## What changes were proposed in this pull request?
* Add a "function type" argument to pandas_udf.
* Add a new public enum class `PandasUdfType` in pyspark.sql.functions
* Refactor udf related code from pyspark.sql.functions to pyspark.sql.udf
* Merge "PythonUdfType" and "PythonEvalType" into a single enum class "PythonEvalType"
Example:
```
from pyspark.sql.functions import pandas_udf, PandasUDFType
pandas_udf('double', PandasUDFType.SCALAR):
def plus_one(v):
return v + 1
```
## Design doc
https://docs.google.com/document/d/1KlLaa-xJ3oz28xlEJqXyCAHU3dwFYkFs_ixcUXrJNTc/edit
## How was this patch tested?
Added PandasUDFTests
## TODO:
* [x] Implement proper enum type for `PandasUDFType`
* [x] Update documentation
* [x] Add more tests in PandasUDFTests
Author: Li Jin <ice.xelloss@gmail.com>
Closes#19630 from icexelloss/spark-22409-pandas-udf-type.
## What changes were proposed in this pull request?
In PySpark API Document, [SparkSession.build](http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html) is not documented and shows default value description.
```
SparkSession.builder = <pyspark.sql.session.Builder object ...
```
This PR adds the doc.
![screen](https://user-images.githubusercontent.com/9700541/32705514-1bdcafaa-c7ca-11e7-88bf-05566fea42de.png)
The following is the diff of the generated result.
```
$ diff old.html new.html
95a96,101
> <dl class="attribute">
> <dt id="pyspark.sql.SparkSession.builder">
> <code class="descname">builder</code><a class="headerlink" href="#pyspark.sql.SparkSession.builder" title="Permalink to this definition">¶</a></dt>
> <dd><p>A class attribute having a <a class="reference internal" href="#pyspark.sql.SparkSession.Builder" title="pyspark.sql.SparkSession.Builder"><code class="xref py py-class docutils literal"><span class="pre">Builder</span></code></a> to construct <a class="reference internal" href="#pyspark.sql.SparkSession" title="pyspark.sql.SparkSession"><code class="xref py py-class docutils literal"><span class="pre">SparkSession</span></code></a> instances</p>
> </dd></dl>
>
212,216d217
< <dt id="pyspark.sql.SparkSession.builder">
< <code class="descname">builder</code><em class="property"> = <pyspark.sql.session.SparkSession.Builder object></em><a class="headerlink" href="#pyspark.sql.SparkSession.builder" title="Permalink to this definition">¶</a></dt>
< <dd></dd></dl>
<
< <dl class="attribute">
```
## How was this patch tested?
Manual.
```
cd python/docs
make html
open _build/html/pyspark.sql.html
```
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19726 from dongjoon-hyun/SPARK-22490.
## What changes were proposed in this pull request?
If schema is passed as a list of unicode strings for column names, they should be re-encoded to 'utf-8' to be consistent. This is similar to the #13097 but for creation of DataFrame using Arrow.
## How was this patch tested?
Added new test of using unicode names for schema.
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#19738 from BryanCutler/arrow-createDataFrame-followup-unicode-SPARK-20791.
## What changes were proposed in this pull request?
This change uses Arrow to optimize the creation of a Spark DataFrame from a Pandas DataFrame. The input df is sliced according to the default parallelism. The optimization is enabled with the existing conf "spark.sql.execution.arrow.enabled" and is disabled by default.
## How was this patch tested?
Added new unit test to create DataFrame with and without the optimization enabled, then compare results.
Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19459 from BryanCutler/arrow-createDataFrame-from_pandas-SPARK-20791.
## What changes were proposed in this pull request?
This PR proposes to add `errorifexists` to SparkR API and fix the rest of them describing the mode, mainly, in API documentations as well.
This PR also replaces `convertToJSaveMode` to `setWriteMode` so that string as is is passed to JVM and executes:
b034f2565f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala (L72-L82)
and remove the duplication here:
3f958a9992/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala (L187-L194)
## How was this patch tested?
Manually checked the built documentation. These were mainly found by `` grep -r `error` `` and `grep -r 'error'`.
Also, unit tests added in `test_sparkSQL.R`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#19673 from HyukjinKwon/SPARK-21640-followup.
## What changes were proposed in this pull request?
This PR adds support for a new function called `dayofweek` that returns the day of the week of the given argument as an integer value in the range 1-7, where 1 represents Sunday.
## How was this patch tested?
Unit tests and manual tests.
Author: ptkool <michael.styles@shopify.com>
Closes#19672 from ptkool/day_of_week_function.
## What changes were proposed in this pull request?
Currently, a pandas.DataFrame that contains a timestamp of type 'datetime64[ns]' when converted to a Spark DataFrame with `createDataFrame` will interpret the values as LongType. This fix will check for a timestamp type and convert it to microseconds which will allow Spark to read as TimestampType.
## How was this patch tested?
Added unit test to verify Spark schema is expected for TimestampType and DateType when created from pandas
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#19646 from BryanCutler/pyspark-non-arrow-createDataFrame-ts-fix-SPARK-22417.
## What changes were proposed in this pull request?
When writing using jdbc with python currently we are wrongly assigning by default None as writing mode. This is due to wrongly calling mode on the `_jwrite` object instead of `self` and it causes an exception.
## How was this patch tested?
manual tests
Author: Marco Gaido <mgaido@hortonworks.com>
Closes#19654 from mgaido91/SPARK-22437.
## What changes were proposed in this pull request?
Under the current execution mode of Python UDFs, we don't well support Python UDFs as branch values or else value in CaseWhen expression.
Since to fix it might need the change not small (e.g., #19592) and this issue has simpler workaround. We should just notice users in the document about this.
## How was this patch tested?
Only document change.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19617 from viirya/SPARK-22347-3.
## What changes were proposed in this pull request?
This PR propose to add `ReusedSQLTestCase` which deduplicate `setUpClass` and `tearDownClass` in `sql/tests.py`.
## How was this patch tested?
Jenkins tests and manual tests.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#19595 from HyukjinKwon/reduce-dupe.
## What changes were proposed in this pull request?
`ArrowEvalPythonExec` and `FlatMapGroupsInPandasExec` are refering config values of `SQLConf` in function for `mapPartitions`/`mapPartitionsInternal`, but we should capture them in Driver.
## How was this patch tested?
Added a test and existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19587 from ueshin/issues/SPARK-22370.
## What changes were proposed in this pull request?
Adding date and timestamp support with Arrow for `toPandas()` and `pandas_udf`s. Timestamps are stored in Arrow as UTC and manifested to the user as timezone-naive localized to the Python system timezone.
## How was this patch tested?
Added Scala tests for date and timestamp types under ArrowConverters, ArrowUtils, and ArrowWriter suites. Added Python tests for `toPandas()` and `pandas_udf`s with date and timestamp types.
Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#18664 from BryanCutler/arrow-date-timestamp-SPARK-21375.
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#19535 from HyukjinKwon/deprecated-warning.
## What changes were proposed in this pull request?
This is a follow-up of #18732.
This pr modifies `GroupedData.apply()` method to convert pandas udf to grouped udf implicitly.
## How was this patch tested?
Exisiting tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19517 from ueshin/issues/SPARK-20396/fup2.
## What changes were proposed in this pull request?
Currently percentile_approx never returns the first element when percentile is in (relativeError, 1/N], where relativeError default 1/10000, and N is the total number of elements. But ideally, percentiles in [0, 1/N] should all return the first element as the answer.
For example, given input data 1 to 10, if a user queries 10% (or even less) percentile, it should return 1, because the first value 1 already reaches 10%. Currently it returns 2.
Based on the paper, targetError is not rounded up, and searching index should start from 0 instead of 1. By following the paper, we should be able to fix the cases mentioned above.
## How was this patch tested?
Added a new test case and fix existing test cases.
Author: Zhenhua Wang <wzh_zju@163.com>
Closes#19438 from wzhfy/improve_percentile_approx.
## What changes were proposed in this pull request?
This PR adds an apply() function on df.groupby(). apply() takes a pandas udf that is a transformation on `pandas.DataFrame` -> `pandas.DataFrame`.
Static schema
-------------------
```
schema = df.schema
pandas_udf(schema)
def normalize(df):
df = df.assign(v1 = (df.v1 - df.v1.mean()) / df.v1.std()
return df
df.groupBy('id').apply(normalize)
```
Dynamic schema
-----------------------
**This use case is removed from the PR and we will discuss this as a follow up. See discussion https://github.com/apache/spark/pull/18732#pullrequestreview-66583248**
Another example to use pd.DataFrame dtypes as output schema of the udf:
```
sample_df = df.filter(df.id == 1).toPandas()
def foo(df):
ret = # Some transformation on the input pd.DataFrame
return ret
foo_udf = pandas_udf(foo, foo(sample_df).dtypes)
df.groupBy('id').apply(foo_udf)
```
In interactive use case, user usually have a sample pd.DataFrame to test function `foo` in their notebook. Having been able to use `foo(sample_df).dtypes` frees user from specifying the output schema of `foo`.
Design doc: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md
## How was this patch tested?
* Added GroupbyApplyTest
Author: Li Jin <ice.xelloss@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#18732 from icexelloss/groupby-apply-SPARK-20396.
## What changes were proposed in this pull request?
This is a follow-up of #19384.
In the previous pr, only definitions of the config names were modified, but we also need to modify the names in runtime or tests specified as string literal.
## How was this patch tested?
Existing tests but modified the config names.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19462 from ueshin/issues/SPARK-22159/fup1.
## What changes were proposed in this pull request?
Fixed some minor issues with pandas_udf related docs and formatting.
## How was this patch tested?
NA
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#19375 from BryanCutler/arrow-pandas_udf-cleanup-minor.
## What changes were proposed in this pull request?
We added a method to the scala API for creating a `DataFrame` from `DataSet[String]` storing CSV in [SPARK-15463](https://issues.apache.org/jira/browse/SPARK-15463) but PySpark doesn't have `Dataset` to support this feature. Therfore, I add an API to create a `DataFrame` from `RDD[String]` storing csv and it's also consistent with PySpark's `spark.read.json`.
For example as below
```
>>> rdd = sc.textFile('python/test_support/sql/ages.csv')
>>> df2 = spark.read.csv(rdd)
>>> df2.dtypes
[('_c0', 'string'), ('_c1', 'string')]
```
## How was this patch tested?
add unit test cases.
Author: goldmedal <liugs963@gmail.com>
Closes#19339 from goldmedal/SPARK-22112.
## What changes were proposed in this pull request?
This change disables the use of 0-parameter pandas_udfs due to the API being overly complex and awkward, and can easily be worked around by using an index column as an input argument. Also added doctests for pandas_udfs which revealed bugs for handling empty partitions and using the pandas_udf decorator.
## How was this patch tested?
Reworked existing 0-parameter test to verify error is raised, added doctest for pandas_udf, added new tests for empty partition and decorator usage.
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#19325 from BryanCutler/arrow-pandas_udf-0-param-remove-SPARK-22106.
## What changes were proposed in this pull request?
The `percentile_approx` function previously accepted numeric type input and output double type results.
But since all numeric types, date and timestamp types are represented as numerics internally, `percentile_approx` can support them easily.
After this PR, it supports date type, timestamp type and numeric types as input types. The result type is also changed to be the same as the input type, which is more reasonable for percentiles.
This change is also required when we generate equi-height histograms for these types.
## How was this patch tested?
Added a new test and modified some existing tests.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#19321 from wzhfy/approx_percentile_support_types.
## What changes were proposed in this pull request?
When calling `DataFrame.toPandas()` (without Arrow enabled), if there is a `IntegralType` column (`IntegerType`, `ShortType`, `ByteType`) that has null values the following exception is thrown:
ValueError: Cannot convert non-finite values (NA or inf) to integer
This is because the null values first get converted to float NaN during the construction of the Pandas DataFrame in `from_records`, and then it is attempted to be converted back to to an integer where it fails.
The fix is going to check if the Pandas DataFrame can cause such failure when converting, if so, we don't do the conversion and use the inferred type by Pandas.
Closes#18945
## How was this patch tested?
Added pyspark test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19319 from viirya/SPARK-21766.
This PR adds vectorized UDFs to the Python API
**Proposed API**
Introduce a flag to turn on vectorization for a defined UDF, for example:
```
pandas_udf(DoubleType())
def plus(a, b)
return a + b
```
or
```
plus = pandas_udf(lambda a, b: a + b, DoubleType())
```
Usage is the same as normal UDFs
0-parameter UDFs
pandas_udf functions can declare an optional `**kwargs` and when evaluated, will contain a key "size" that will give the required length of the output. For example:
```
pandas_udf(LongType())
def f0(**kwargs):
return pd.Series(1).repeat(kwargs["size"])
df.select(f0())
```
Added new unit tests in pyspark.sql that are enabled if pyarrow and Pandas are available.
- [x] Fix support for promoted types with null values
- [ ] Discuss 0-param UDF API (use of kwargs)
- [x] Add tests for chained UDFs
- [ ] Discuss behavior when pyarrow not installed / enabled
- [ ] Cleanup pydoc and add user docs
Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#18659 from BryanCutler/arrow-vectorized-udfs-SPARK-21404.
## What changes were proposed in this pull request?
Clarify behavior of to_utc_timestamp/from_utc_timestamp with an example
## How was this patch tested?
Doc only change / existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19276 from srowen/SPARK-22049.
## What changes were proposed in this pull request?
StructType.fromInternal is calling f.fromInternal(v) for every field.
We can use precalculated information about type to limit the number of function calls. (its calculated once per StructType and used in per record calculations)
Benchmarks (Python profiler)
```
df = spark.range(10000000).selectExpr("id as id0", "id as id1", "id as id2", "id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", "id as id9", "struct(id) as s").cache()
df.count()
df.rdd.map(lambda x: x).count()
```
Before
```
310274584 function calls (300272456 primitive calls) in 1320.684 seconds
Ordered by: internal time, cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
10000000 253.417 0.000 486.991 0.000 types.py:619(<listcomp>)
30000000 192.272 0.000 1009.986 0.000 types.py:612(fromInternal)
100000000 176.140 0.000 176.140 0.000 types.py:88(fromInternal)
20000000 156.832 0.000 328.093 0.000 types.py:1471(_create_row)
14000 107.206 0.008 1237.917 0.088 {built-in method loads}
20000000 80.176 0.000 1090.162 0.000 types.py:1468(<lambda>)
```
After
```
210274584 function calls (200272456 primitive calls) in 1035.974 seconds
Ordered by: internal time, cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
30000000 215.845 0.000 698.748 0.000 types.py:612(fromInternal)
20000000 165.042 0.000 351.572 0.000 types.py:1471(_create_row)
14000 116.834 0.008 946.791 0.068 {built-in method loads}
20000000 87.326 0.000 786.073 0.000 types.py:1468(<lambda>)
20000000 85.477 0.000 134.607 0.000 types.py:1519(__new__)
10000000 65.777 0.000 126.712 0.000 types.py:619(<listcomp>)
```
Main difference is types.py:619(<listcomp>) and types.py:88(fromInternal) (which is removed in After)
The number of function calls is 100 million less. And performance is 20% better.
Benchmark (worst case scenario.)
Test
```
df = spark.range(1000000).selectExpr("current_timestamp as id0", "current_timestamp as id1", "current_timestamp as id2", "current_timestamp as id3", "current_timestamp as id4", "current_timestamp as id5", "current_timestamp as id6", "current_timestamp as id7", "current_timestamp as id8", "current_timestamp as id9").cache()
df.count()
df.rdd.map(lambda x: x).count()
```
Before
```
31166064 function calls (31163984 primitive calls) in 150.882 seconds
```
After
```
31166064 function calls (31163984 primitive calls) in 153.220 seconds
```
IMPORTANT:
The benchmark was done on top of https://github.com/apache/spark/pull/19246.
Without https://github.com/apache/spark/pull/19246 the performance improvement will be even greater.
## How was this patch tested?
Existing tests.
Performance benchmark.
Author: Maciej Bryński <maciek-github@brynski.pl>
Closes#19249 from maver1ck/spark_22032.
## What changes were proposed in this pull request?
In previous work SPARK-21513, we has allowed `MapType` and `ArrayType` of `MapType`s convert to a json string but only for Scala API. In this follow-up PR, we will make SparkSQL support it for PySpark and SparkR, too. We also fix some little bugs and comments of the previous work in this follow-up PR.
### For PySpark
```
>>> data = [(1, {"name": "Alice"})]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'{"name":"Alice")']
>>> data = [(1, [{"name": "Alice"}, {"name": "Bob"}])]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'[{"name":"Alice"},{"name":"Bob"}]')]
```
### For SparkR
```
# Converts a map into a JSON object
df2 <- sql("SELECT map('name', 'Bob')) as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
# Converts an array of maps into a JSON array
df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
```
## How was this patch tested?
Add unit test cases.
cc viirya HyukjinKwon
Author: goldmedal <liugs963@gmail.com>
Closes#19223 from goldmedal/SPARK-21513-fp-PySaprkAndSparkR.