### What changes were proposed in this pull request?
The Experimental and Evolving annotations are both (like Unstable) used to express that a an API may change. However there are many things in the code that have been marked that way since even Spark 1.x. Per the dev thread, anything introduced at or before Spark 2.3.0 is pretty much 'stable' in that it would not change without a deprecation cycle. Therefore I'd like to remove most of these annotations. And, remove the `:: Experimental ::` scaladoc tag too. And likewise for Python, R.
The changes below can be summarized as:
- Generally, anything introduced at or before Spark 2.3.0 has been unmarked as neither Evolving nor Experimental
- Obviously experimental items like DSv2, Barrier mode, ExperimentalMethods are untouched
- I _did_ unmark a few MLlib classes introduced in 2.4, as I am quite confident they're not going to change (e.g. KolmogorovSmirnovTest, PowerIterationClustering)
It's a big change to review, so I'd suggest scanning the list of _files_ changed to see if any area seems like it should remain partly experimental and examine those.
### Why are the changes needed?
Many of these annotations are incorrect; the APIs are de facto stable. Leaving them also makes legitimate usages of the annotations less meaningful.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#25558 from srowen/SPARK-28855.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
The parameters doc string of the function format_string was changed from _col_, _d_ to _format_, _cols_ which is what the actual function declaration states
### Why are the changes needed?
The parameters stated by the documentation was inaccurate
### Does this PR introduce any user-facing change?
Yes.
**BEFORE**
![before](https://user-images.githubusercontent.com/9700541/63310013-e21a0e80-c2ad-11e9-806b-1d272c5cde12.png)
**AFTER**
![after](https://user-images.githubusercontent.com/9700541/63315812-6b870c00-c2c1-11e9-8165-82782628cd1a.png)
### How was this patch tested?
N/A: documentation only
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->
Closes#25506 from darrentirto/SPARK-28777.
Authored-by: darrentirto <darrentirto@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
In the PR, I propose to use `uuuu` for years instead of `yyyy` in date/timestamp patterns without the era pattern `G` (https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). **Parsing/formatting of positive years (current era) will be the same.** The difference is in formatting negative years belong to previous era - BC (Before Christ).
I replaced the `yyyy` pattern by `uuuu` everywhere except:
1. Test, Suite & Benchmark. Existing tests must work as is.
2. `SimpleDateFormat` because it doesn't support the `uuuu` pattern.
3. Comments and examples (except comments related to already replaced patterns).
Before the changes, the year of common era `100` and the year of BC era `-99`, showed similarly as `100`. After the changes negative years will be formatted with the `-` sign.
Before:
```Scala
scala> Seq(java.time.LocalDate.of(-99, 1, 1)).toDF().show
+----------+
| value|
+----------+
|0100-01-01|
+----------+
```
After:
```Scala
scala> Seq(java.time.LocalDate.of(-99, 1, 1)).toDF().show
+-----------+
| value|
+-----------+
|-0099-01-01|
+-----------+
```
## How was this patch tested?
By existing test suites, and added tests for negative years to `DateFormatterSuite` and `TimestampFormatterSuite`.
Closes#25230 from MaxGekk/year-pattern-uuuu.
Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
This adds simple check for `count` argument:
- If it is a `Column` we apply `_to_java_column` before invoking JVM counterpart
- Otherwise we proceed as before.
## How was this patch tested?
Manual testing.
Closes#25193 from zero323/SPARK-28278.
Authored-by: zero323 <mszymkiewicz@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
In the PR, I propose to convert options values to strings by using `to_str()` for the following functions: `from_csv()`, `to_csv()`, `from_json()`, `to_json()`, `schema_of_csv()` and `schema_of_json()`. This will make handling of function options consistent to option handling in `DataFrameReader`/`DataFrameWriter`.
For example:
```Python
df.select(from_csv(df.value, "s string", {'ignoreLeadingWhiteSpace': True})
```
## How was this patch tested?
Added an example for `from_csv()` which was tested by:
```Shell
./python/run-tests --testnames pyspark.sql.functions
```
Closes#25182 from MaxGekk/options_to_str.
Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This PR proposes to rename `mapPartitionsInPandas` to `mapInPandas` with a separate evaluation type .
Had an offline discussion with rxin, mengxr and cloud-fan
The reason is basically:
1. `SCALAR_ITER` doesn't make sense with `mapPartitionsInPandas`.
2. It cannot share the same Pandas UDF, for instance, at `select` and `mapPartitionsInPandas` unlike `GROUPED_AGG` because iterator's return type is different.
3. `mapPartitionsInPandas` -> `mapInPandas` - see https://github.com/apache/spark/pull/25044#issuecomment-508298552 and https://github.com/apache/spark/pull/25044#issuecomment-508299764
Renaming `SCALAR_ITER` as `MAP_ITER` is abandoned due to 2. reason.
For `XXX_ITER`, it might have to have a different interface in the future if we happen to add other versions of them. But this is an orthogonal topic with `mapPartitionsInPandas`.
## How was this patch tested?
Existing tests should cover.
Closes#25044 from HyukjinKwon/SPARK-28198.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Add docstring/doctest for `SCALAR_ITER` Pandas UDF. I explicitly mentioned that per-partition execution is an implementation detail, not guaranteed. I will submit another PR to add the same to user guide, just to keep this PR minimal.
I didn't add "doctest: +SKIP" in the first commit so it is easy to test locally.
cc: HyukjinKwon gatorsmile icexelloss BryanCutler WeichenXu123
![Screen Shot 2019-06-28 at 9 52 41 AM](https://user-images.githubusercontent.com/829644/60358349-b0aa5400-998a-11e9-9ebf-8481dfd555b5.png)
![Screen Shot 2019-06-28 at 9 53 19 AM](https://user-images.githubusercontent.com/829644/60358355-b1db8100-998a-11e9-8f6f-00a11bdbdc4d.png)
## How was this patch tested?
doctest
Closes#25005 from mengxr/SPARK-28056.2.
Authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
Allow Pandas UDF to take an iterator of pd.Series or an iterator of tuple of pd.Series.
Note the UDF input args will be always one iterator:
* if the udf take only column as input, the iterator's element will be pd.Series (corresponding to the column values batch)
* if the udf take multiple columns as inputs, the iterator's element will be a tuple composed of multiple `pd.Series`s, each one corresponding to the multiple columns as inputs (keep the same order). For example:
```
pandas_udf("int", PandasUDFType.SCALAR_ITER)
def the_udf(iterator):
for col1_batch, col2_batch in iterator:
yield col1_batch + col2_batch
df.select(the_udf("col1", "col2"))
```
The udf above will add col1 and col2.
I haven't add unit tests, but manually tests show it works fine. So it is ready for first pass review.
We can test several typical cases:
```
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import udf
from pyspark.taskcontext import TaskContext
df = spark.createDataFrame([(1, 20), (3, 40)], ["a", "b"])
pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi1(it):
pid = TaskContext.get().partitionId()
print("DBG: fi1: do init stuff, partitionId=" + str(pid))
for batch in it:
yield batch + 100
print("DBG: fi1: do close stuff, partitionId=" + str(pid))
pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi2(it):
pid = TaskContext.get().partitionId()
print("DBG: fi2: do init stuff, partitionId=" + str(pid))
for batch in it:
yield batch + 10000
print("DBG: fi2: do close stuff, partitionId=" + str(pid))
pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi3(it):
pid = TaskContext.get().partitionId()
print("DBG: fi3: do init stuff, partitionId=" + str(pid))
for x, y in it:
yield x + y * 10 + 100000
print("DBG: fi3: do close stuff, partitionId=" + str(pid))
pandas_udf("int", PandasUDFType.SCALAR)
def fp1(x):
return x + 1000
udf("int")
def fu1(x):
return x + 10
# test select "pandas iter udf/pandas udf/sql udf" expressions at the same time.
# Note this case the `fi1("a"), fi2("b"), fi3("a", "b")` will generate only one plan,
# and `fu1("a")`, `fp1("a")` will generate another two separate plans.
df.select(fi1("a"), fi2("b"), fi3("a", "b"), fu1("a"), fp1("a")).show()
# test chain two pandas iter udf together
# Note this case `fi2(fi1("a"))` will generate only one plan
# Also note the init stuff/close stuff call order will be like:
# (debug output following)
# DBG: fi2: do init stuff, partitionId=0
# DBG: fi1: do init stuff, partitionId=0
# DBG: fi1: do close stuff, partitionId=0
# DBG: fi2: do close stuff, partitionId=0
df.select(fi2(fi1("a"))).show()
# test more complex chain
# Note this case `fi1("a"), fi2("a")` will generate one plan,
# and `fi3(fi1_output, fi2_output)` will generate another plan
df.select(fi3(fi1("a"), fi2("a"))).show()
```
## How was this patch tested?
To be added.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#24643 from WeichenXu123/pandas_udf_iter.
Lead-authored-by: WeichenXu <weichen.xu@databricks.com>
Co-authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
## What changes were proposed in this pull request?
This PR addresses SPARK-23619: https://issues.apache.org/jira/browse/SPARK-23619
It adds additional comments indicating the default column names for the `explode` and `posexplode`
functions in Spark-SQL.
Functions for which comments have been updated so far:
* stack
* inline
* explode
* posexplode
* explode_outer
* posexplode_outer
## How was this patch tested?
This is just a change in the comments. The package builds and tests successfullly after the change.
Closes#23748 from jashgala/SPARK-23619.
Authored-by: Jash Gala <jashgala@amazon.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
In the PR, I propose to deprecate the `from_utc_timestamp()` and `to_utc_timestamp`, and disable them by default. The functions can be enabled back via the SQL config `spark.sql.legacy.utcTimestampFunc.enabled`. By default, any calls of the functions throw an analysis exception.
One of the reason for deprecation is functions violate semantic of `TimestampType` which is number of microseconds since epoch in UTC time zone. Shifting microseconds since epoch by time zone offset doesn't make sense because the result doesn't represent microseconds since epoch in UTC time zone any more, and cannot be considered as `TimestampType`.
## How was this patch tested?
The changes were tested by `DateExpressionsSuite` and `DateFunctionsSuite`.
Closes#24195 from MaxGekk/conv-utc-timestamp-deprecate.
Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
In the PR, I propose to deprecate the `from_utc_timestamp()` and `to_utc_timestamp`, and disable them by default. The functions can be enabled back via the SQL config `spark.sql.legacy.utcTimestampFunc.enabled`. By default, any calls of the functions throw an analysis exception.
One of the reason for deprecation is functions violate semantic of `TimestampType` which is number of microseconds since epoch in UTC time zone. Shifting microseconds since epoch by time zone offset doesn't make sense because the result doesn't represent microseconds since epoch in UTC time zone any more, and cannot be considered as `TimestampType`.
## How was this patch tested?
The changes were tested by `DateExpressionsSuite` and `DateFunctionsSuite`.
Closes#24195 from MaxGekk/conv-utc-timestamp-deprecate.
Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
The hashSeed method allocates 64 bytes instead of 8. Other bytes are always zeros (thanks to default behavior of ByteBuffer). And they could be excluded from hash calculation because they don't differentiate inputs.
## How was this patch tested?
By running the existing tests - XORShiftRandomSuite
Closes#20793 from MaxGekk/hash-buff-size.
Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This introduces a new SQL function 'xxhash64' for getting a 64-bit hash of an arbitrary number of columns.
This is designed to exactly mimic the 32-bit `hash`, which uses
MurmurHash3. The name is designed to be more future-proof than the
'hash', by indicating the exact algorithm used, similar to md5 and the
sha hashes.
## How was this patch tested?
The tests for the existing `hash` function were duplicated to run with `xxhash64`.
Closes#24019 from huonw/hash64.
Authored-by: Huon Wilson <Huon.Wilson@data61.csiro.au>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/23882 to handle binary math/string functions. For instance, see the cases below:
**Before:**
```python
>>> from pyspark.sql.functions import lit, ascii
>>> spark.range(1).select(lit('a').alias("value")).select(ascii("value"))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../spark/python/pyspark/sql/functions.py", line 51, in _
jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
File "/.../spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
File "/.../spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/.../spark/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 332, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.ascii. Trace:
py4j.Py4JException: Method ascii([class java.lang.String]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
at py4j.Gateway.invoke(Gateway.java:276)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```
```python
>>> from pyspark.sql.functions import atan2
>>> spark.range(1).select(atan2("id", "id"))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../spark/python/pyspark/sql/functions.py", line 78, in _
jc = getattr(sc._jvm.functions, name)(col1._jc if isinstance(col1, Column) else float(col1),
ValueError: could not convert string to float: id
```
**After:**
```python
>>> from pyspark.sql.functions import lit, ascii
>>> spark.range(1).select(lit('a').alias("value")).select(ascii("value"))
DataFrame[ascii(value): int]
```
```python
>>> from pyspark.sql.functions import atan2
>>> spark.range(1).select(atan2("id", "id"))
DataFrame[ATAN2(id, id): double]
```
Note that,
- This PR causes a slight behaviour changes for math functions. For instance, numbers as strings (e.g., `"1"`) were supported as arguments of binary math functions before. After this PR, it recognises it as column names.
- I also intentionally didn't document this behaviour changes since we're going ahead for Spark 3.0 and I don't think numbers as strings make much sense in math functions.
- There is another exception `when`, which takes string as literal values as below. This PR doeesn't fix this ambiguity.
```python
>>> spark.range(1).select(when(lit(True), col("id"))).show()
```
```
+--------------------------+
|CASE WHEN true THEN id END|
+--------------------------+
| 0|
+--------------------------+
```
```python
>>> spark.range(1).select(when(lit(True), "id")).show()
```
```
+--------------------------+
|CASE WHEN true THEN id END|
+--------------------------+
| id|
+--------------------------+
```
This PR also fixes as below:
https://github.com/apache/spark/pull/23882 fixed it to:
- Rename `_create_function` to `_create_name_function`
- Define new `_create_function` to take strings as column names.
This PR, I proposes to:
- Revert `_create_name_function` name to `_create_function`.
- Define new `_create_function_over_column` to take strings as column names.
## How was this patch tested?
Some unit tests were added for binary math / string functions.
Closes#24121 from HyukjinKwon/SPARK-26979.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Most SQL functions defined in `spark.sql.functions` have two calling patterns, one with a Column object as input, and another with a string representing a column name, which is then converted into a Column object internally.
There are, however, a few notable exceptions:
- lower()
- upper()
- abs()
- bitwiseNOT()
- ltrim()
- rtrim()
- trim()
- ascii()
- base64()
- unbase64()
While this doesn't break anything, as you can easily create a Column object yourself prior to passing it to one of these functions, it has two undesirable consequences:
1. It is surprising - it breaks coder's expectations when they are first starting with Spark. Every API should be as consistent as possible, so as to make the learning curve smoother and to reduce causes for human error;
2. It gets in the way of stylistic conventions. Most of the time it makes Python code more readable to use literal names, and the API provides ample support for that, but these few exceptions prevent this pattern from being universally applicable.
This patch is meant to fix the aforementioned problem.
### Effect
This patch **enables** support for passing column names as input to those functions mentioned above.
### Side effects
This PR also **fixes** an issue with some functions being defined multiple times by using `_create_function()`.
### How it works
`_create_function()` was redefined to always convert the argument to a Column object. The old implementation has been kept under `_create_name_function()`, and is still being used to generate the following special functions:
- lit()
- col()
- column()
- asc()
- desc()
- asc_nulls_first()
- asc_nulls_last()
- desc_nulls_first()
- desc_nulls_last()
This is because these functions can only take a column name as their argument. This is not a problem, as their semantics require so.
## How was this patch tested?
Ran ./dev/run-tests and tested it manually.
Closes#23882 from asmello/col-name-support-pyspark.
Authored-by: André Sá de Mello <amello@palantir.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This change adds support for returning StructType from a scalar Pandas UDF, where the return value of the function is a pandas.DataFrame. Nested structs are not supported and an error will be raised, child types can be any other type currently supported.
## How was this patch tested?
Added additional unit tests to `test_pandas_udf_scalar`
Closes#23900 from BryanCutler/pyspark-support-scalar_udf-StructType-SPARK-23836.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
## What changes were proposed in this pull request?
Change aligns argument name with that in Scala version and documentation.
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#23357 from deepyaman/patch-1.
Authored-by: deepyaman <deepyaman.datta@utexas.edu>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
In the PR, I propose to switch the `DateFormatClass`, `ToUnixTimestamp`, `FromUnixTime`, `UnixTime` on java.time API for parsing/formatting dates and timestamps. The API has been already implemented by the `Timestamp`/`DateFormatter` classes. One of benefit is those classes support parsing timestamps with microsecond precision. Old behaviour can be switched on via SQL config: `spark.sql.legacy.timeParser.enabled` (`false` by default).
## How was this patch tested?
It was tested by existing test suites - `DateFunctionsSuite`, `DateExpressionsSuite`, `JsonSuite`, `CsvSuite`, `SQLQueryTestSuite` as well as PySpark tests.
Closes#23358 from MaxGekk/new-time-cast.
Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This PR implements a new feature - window aggregation Pandas UDF for bounded window.
#### Doc:
https://docs.google.com/document/d/14EjeY5z4-NC27-SmIP9CsMPCANeTcvxN44a7SIJtZPc/edit#heading=h.c87w44wcj3wj
#### Example:
```
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.window import Window
df = spark.range(0, 10, 2).toDF('v')
w1 = Window.partitionBy().orderBy('v').rangeBetween(-2, 4)
w2 = Window.partitionBy().orderBy('v').rowsBetween(-2, 2)
pandas_udf('double', PandasUDFType.GROUPED_AGG)
def avg(v):
return v.mean()
df.withColumn('v_mean', avg(df['v']).over(w1)).show()
# +---+------+
# | v|v_mean|
# +---+------+
# | 0| 1.0|
# | 2| 2.0|
# | 4| 4.0|
# | 6| 6.0|
# | 8| 7.0|
# +---+------+
df.withColumn('v_mean', avg(df['v']).over(w2)).show()
# +---+------+
# | v|v_mean|
# +---+------+
# | 0| 2.0|
# | 2| 3.0|
# | 4| 4.0|
# | 6| 5.0|
# | 8| 6.0|
# +---+------+
```
#### High level changes:
This PR modifies the existing WindowInPandasExec physical node to deal with unbounded (growing, shrinking and sliding) windows.
* `WindowInPandasExec` now share the same base class as `WindowExec` and share utility functions. See `WindowExecBase`
* `WindowFunctionFrame` now has two new functions `currentLowerBound` and `currentUpperBound` - to return the lower and upper window bound for the current output row. It is also modified to allow `AggregateProcessor` == null. Null aggregator processor is used for `WindowInPandasExec` where we don't have an aggregator and only uses lower and upper bound functions from `WindowFunctionFrame`
* The biggest change is in `WindowInPandasExec`, where it is modified to take `currentLowerBound` and `currentUpperBound` and write those values together with the input data to the python process for rolling window aggregation. See `WindowInPandasExec` for more details.
#### Discussion
In benchmarking, I found numpy variant of the rolling window UDF is much faster than the pandas version:
Spark SQL window function: 20s
Pandas variant: ~80s
Numpy variant: 10s
Numpy variant with numba: 4s
Allowing numpy variant of the vectorized UDFs is something I want to discuss because of the performance improvement, but doesn't have to be in this PR.
## How was this patch tested?
New tests
Closes#22305 from icexelloss/SPARK-24561-bounded-window-udf.
Authored-by: Li Jin <ice.xelloss@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Currently duplicated map keys are not handled consistently. For example, map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc.
This PR proposes to remove duplicated map keys with last wins policy, to follow Java/Scala and Presto. It only applies to built-in functions, as users can create map with duplicated map keys via private APIs anyway.
updated functions: `CreateMap`, `MapFromArrays`, `MapFromEntries`, `StringToMap`, `MapConcat`, `TransformKeys`.
For other places:
1. data source v1 doesn't have this problem, as users need to provide a java/scala map, which can't have duplicated keys.
2. data source v2 may have this problem. I've added a note to `ArrayBasedMapData` to ask the caller to take care of duplicated keys. In the future we should enforce it in the stable data APIs for data source v2.
3. UDF doesn't have this problem, as users need to provide a java/scala map. Same as data source v1.
4. file format. I checked all of them and only parquet does not enforce it. For backward compatibility reasons I change nothing but leave a note saying that the behavior will be undefined if users write map with duplicated keys to parquet files. Maybe we can add a config and fail by default if parquet files have map with duplicated keys. This can be done in followup.
## How was this patch tested?
updated tests and new tests
Closes#23124 from cloud-fan/map.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
The following 5 functions were removed from branch-2.4:
- map_entries
- map_filter
- transform_values
- transform_keys
- map_zip_with
We should update the since version to 3.0.0.
## How was this patch tested?
Existing tests.
Closes#23082 from ueshin/issues/SPARK-26112/since.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
- Remove some AccumulableInfo .apply() methods
- Remove non-label-specific multiclass precision/recall/fScore in favor of accuracy
- Remove toDegrees/toRadians in favor of degrees/radians (SparkR: only deprecated)
- Remove approxCountDistinct in favor of approx_count_distinct (SparkR: only deprecated)
- Remove unused Python StorageLevel constants
- Remove Dataset unionAll in favor of union
- Remove unused multiclass option in libsvm parsing
- Remove references to deprecated spark configs like spark.yarn.am.port
- Remove TaskContext.isRunningLocally
- Remove ShuffleMetrics.shuffle* methods
- Remove BaseReadWrite.context in favor of session
- Remove Column.!== in favor of =!=
- Remove Dataset.explode
- Remove Dataset.registerTempTable
- Remove SQLContext.getOrCreate, setActive, clearActive, constructors
Not touched yet
- everything else in MLLib
- HiveContext
- Anything deprecated more recently than 2.0.0, generally
## How was this patch tested?
Existing tests
Closes#22921 from srowen/SPARK-25908.
Lead-authored-by: Sean Owen <sean.owen@databricks.com>
Co-authored-by: hyukjinkwon <gurwls223@apache.org>
Co-authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
New functions takes a struct and converts it to a CSV strings using passed CSV options. It accepts the same CSV options as CSV data source does.
## How was this patch tested?
Added `CsvExpressionsSuite`, `CsvFunctionsSuite` as well as R, Python and SQL tests similar to tests for `to_json()`
Closes#22626 from MaxGekk/to_csv.
Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
In the PR, I propose to add new function - *schema_of_csv()* which infers schema of CSV string literal. The result of the function is a string containing a schema in DDL format. For example:
```sql
select schema_of_csv('1|abc', map('delimiter', '|'))
```
```
struct<_c0:int,_c1:string>
```
## How was this patch tested?
Added new tests to `CsvFunctionsSuite`, `CsvExpressionsSuite` and SQL tests to `csv-functions.sql`
Closes#22666 from MaxGekk/schema_of_csv-function.
Lead-authored-by: hyukjinkwon <gurwls223@apache.org>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
The main purpose of `schema_of_json` is the usage of combination with `from_json` (to make up the leak of schema inference) which takes its schema only as literal; however, currently `schema_of_json` allows JSON input as non-literal expressions (e.g, column).
This was mistakenly allowed - we don't have to take other usages rather then the main purpose into account for now.
This PR makes a followup to only allow literals for `schema_of_json`'s JSON input. We can allow non literal expressions later when it's needed or there are some usecase for it.
## How was this patch tested?
Unit tests were added.
Closes#22775 from HyukjinKwon/SPARK-25447-followup.
Lead-authored-by: hyukjinkwon <gurwls223@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
See the detailed information at https://issues.apache.org/jira/browse/SPARK-25841 on why these APIs should be deprecated and redesigned.
This patch also reverts 8acb51f08b which applies to 2.4.
## How was this patch tested?
Only deprecation and doc changes.
Closes#22841 from rxin/SPARK-25842.
Authored-by: Reynold Xin <rxin@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
In the PR, I propose to switch `from_json` on `FailureSafeParser`, and to make the function compatible to `PERMISSIVE` mode by default, and to support the `FAILFAST` mode as well. The `DROPMALFORMED` mode is not supported by `from_json`.
## How was this patch tested?
It was tested by existing `JsonSuite`/`CSVSuite`, `JsonFunctionsSuite` and `JsonExpressionsSuite` as well as new tests for `from_json` which checks different modes.
Closes#22237 from MaxGekk/from_json-failuresafe.
Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
The PR adds new function `from_csv()` similar to `from_json()` to parse columns with CSV strings. I added the following methods:
```Scala
def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column
```
and this signature to call it from Python, R and Java:
```Scala
def from_csv(e: Column, schema: String, options: java.util.Map[String, String]): Column
```
## How was this patch tested?
Added new test suites `CsvExpressionsSuite`, `CsvFunctionsSuite` and sql tests.
Closes#22379 from MaxGekk/from_csv.
Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Co-authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
We are facing some problems about type conversions between Python data and SQL types in UDFs (Pandas UDFs as well).
It's even difficult to identify the problems (see https://github.com/apache/spark/pull/20163 and https://github.com/apache/spark/pull/22610).
This PR targets to internally document the type conversion table. Some of them looks buggy and we should fix them.
```python
import sys
import array
import datetime
from decimal import Decimal
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import udf
if sys.version >= '3':
long = int
data = [
None,
True,
1,
long(1),
"a",
u"a",
datetime.date(1970, 1, 1),
datetime.datetime(1970, 1, 1, 0, 0),
1.0,
array.array("i", [1]),
[1],
(1,),
bytearray([65, 66, 67]),
Decimal(1),
{"a": 1},
Row(kwargs=1),
Row("namedtuple")(1),
]
types = [
BooleanType(),
ByteType(),
ShortType(),
IntegerType(),
LongType(),
StringType(),
DateType(),
TimestampType(),
FloatType(),
DoubleType(),
ArrayType(IntegerType()),
BinaryType(),
DecimalType(10, 0),
MapType(StringType(), IntegerType()),
StructType([StructField("_1", IntegerType())]),
]
df = spark.range(1)
results = []
count = 0
total = len(types) * len(data)
spark.sparkContext.setLogLevel("FATAL")
for t in types:
result = []
for v in data:
try:
row = df.select(udf(lambda: v, t)()).first()
ret_str = repr(row[0])
except Exception:
ret_str = "X"
result.append(ret_str)
progress = "SQL Type: [%s]\n Python Value: [%s(%s)]\n Result Python Value: [%s]" % (
t.simpleString(), str(v), type(v).__name__, ret_str)
count += 1
print("%s/%s:\n %s" % (count, total, progress))
results.append([t.simpleString()] + list(map(str, result)))
schema = ["SQL Type \\ Python Value(Type)"] + list(map(lambda v: "%s(%s)" % (str(v), type(v).__name__), data))
strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False)
print("\n".join(map(lambda line: " # %s # noqa" % line, strings.strip().split("\n"))))
```
This table was generated under Python 2 but the code above is Python 3 compatible as well.
## How was this patch tested?
Manually tested and lint check.
Closes#22655 from HyukjinKwon/SPARK-25666.
Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
For Pandas UDFs, we get arrow type from defined Catalyst return data type of UDFs. We use this arrow type to do serialization of data. If the defined return data type doesn't match with actual return type of Pandas.Series returned by Pandas UDFs, it has a risk to return incorrect data from Python side.
Currently we don't have reliable approach to check if the data conversion is safe or not. We leave some document to notify this to users for now. When there is next upgrade of PyArrow available we can use to check it, we should add the option to check it.
## How was this patch tested?
Only document change.
Closes#22610 from viirya/SPARK-25461.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Adds support for the setting limit in the sql split function
## How was this patch tested?
1. Updated unit tests
2. Tested using Scala spark shell
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#22227 from phegstrom/master.
Authored-by: Parker Hegstrom <phegstrom@palantir.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This patch is to bump the master branch version to 3.0.0-SNAPSHOT.
## How was this patch tested?
N/A
Closes#22606 from gatorsmile/bump3.0.
Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
In the PR, I propose to extended the `schema_of_json()` function, and accept JSON options since they can impact on schema inferring. Purpose is to support the same options that `from_json` can use during schema inferring.
## How was this patch tested?
Added SQL, Python and Scala tests (`JsonExpressionsSuite` and `JsonFunctionsSuite`) that checks JSON options are used.
Closes#22442 from MaxGekk/schema_of_json-options.
Authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
We have an agreement that the behavior of `from/to_utc_timestamp` is corrected, although the function itself doesn't make much sense in Spark: https://issues.apache.org/jira/browse/SPARK-23715
This PR improves the document.
## How was this patch tested?
N/A
Closes#22543 from cloud-fan/doc.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
The PR introduces new JSON option `pretty` which allows to turn on `DefaultPrettyPrinter` of `Jackson`'s Json generator. New option is useful in exploring of deep nested columns and in converting of JSON columns in more readable representation (look at the added test).
## How was this patch tested?
Added rount trip test which convert an JSON string to pretty representation via `from_json()` and `to_json()`.
Closes#22534 from MaxGekk/pretty-json.
Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
(This change is a subset of the changes needed for the JIRA; see https://github.com/apache/spark/pull/22231)
## What changes were proposed in this pull request?
Use raw strings and simpler regex syntax consistently in Python, which also avoids warnings from pycodestyle about accidentally relying Python's non-escaping of non-reserved chars in normal strings. Also, fix a few long lines.
## How was this patch tested?
Existing tests, and some manual double-checking of the behavior of regexes in Python 2/3 to be sure.
Closes#22400 from srowen/SPARK-25238.2.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Clarify docstring for Scalar functions
## How was this patch tested?
Adds a unit test showing use similar to wordcount, there's existing unit test for array of floats as well.
Closes#20908 from holdenk/SPARK-23672-document-support-for-nested-return-types-in-scalar-with-arrow-udfs.
Authored-by: Holden Karau <holden@pigscanfly.ca>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
## What changes were proposed in this pull request?
This PR proposes to add another example for multiple grouping key in group aggregate pandas UDF since this feature could make users still confused.
## How was this patch tested?
Manually tested and documentation built.
Closes#22329 from HyukjinKwon/SPARK-25328.
Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
## What changes were proposed in this pull request?
In the PR, I propose to extended `to_json` and support any types as element types of input arrays. It should allow converting arrays of primitive types and arrays of arrays. For example:
```
select to_json(array('1','2','3'))
> ["1","2","3"]
select to_json(array(array(1,2,3),array(4)))
> [[1,2,3],[4]]
```
## How was this patch tested?
Added a couple sql tests for arrays of primitive type and of arrays. Also I added round trip test `from_json` -> `to_json`.
Closes#22226 from MaxGekk/to_json-array.
Authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Include PandasUDFType in the import all of pyspark.sql.functions
## How was this patch tested?
Run the test case from the pyspark shell from the jira [spark-25105](https://jira.apache.org/jira/browse/SPARK-25105?jql=project%20%3D%20SPARK%20AND%20component%20in%20(ML%2C%20PySpark%2C%20SQL%2C%20%22Structured%20Streaming%22))
I manually test on pyspark-shell:
before:
`
>>> from pyspark.sql.functions import *
>>> foo = pandas_udf(lambda x: x, 'v int', PandasUDFType.GROUPED_MAP)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
NameError: name 'PandasUDFType' is not defined
>>>
`
after:
`
>>> from pyspark.sql.functions import *
>>> foo = pandas_udf(lambda x: x, 'v int', PandasUDFType.GROUPED_MAP)
>>>
`
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#22100 from kevinyu98/spark-25105.
Authored-by: Kevin Yu <qyu@us.ibm.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
## What changes were proposed in this pull request?
The PR removes a restriction for element types of array type which exists in `from_json` for the root type. Currently, the function can handle only arrays of structs. Even array of primitive types is disallowed. The PR allows arrays of any types currently supported by JSON datasource. Here is an example of an array of a primitive type:
```
scala> import org.apache.spark.sql.functions._
scala> val df = Seq("[1, 2, 3]").toDF("a")
scala> val schema = new ArrayType(IntegerType, false)
scala> val arr = df.select(from_json($"a", schema))
scala> arr.printSchema
root
|-- jsontostructs(a): array (nullable = true)
| |-- element: integer (containsNull = true)
```
and result of converting of the json string to the `ArrayType`:
```
scala> arr.show
+----------------+
|jsontostructs(a)|
+----------------+
| [1, 2, 3]|
+----------------+
```
## How was this patch tested?
I added a few positive and negative tests:
- array of primitive types
- array of arrays
- array of structs
- array of maps
Closes#21439 from MaxGekk/from_json-array.
Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
The PR adds the SQL function `array_intersect`. The behavior of the function is based on Presto's one.
This function returns returns an array of the elements in the intersection of array1 and array2.
Note: The order of elements in the result is not defined.
## How was this patch tested?
Added UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21102 from kiszk/SPARK-23913.
## What changes were proposed in this pull request?
The PR adds the SQL function `array_except`. The behavior of the function is based on Presto's one.
This function returns returns an array of the elements in array1 but not in array2.
Note: The order of elements in the result is not defined.
## How was this patch tested?
Added UTs.
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21103 from kiszk/SPARK-23915.
## What changes were proposed in this pull request?
Update Pandas UDFs section in sql-programming-guide. Add section for grouped aggregate pandas UDF.
## How was this patch tested?
Author: Li Jin <ice.xelloss@gmail.com>
Closes#21887 from icexelloss/SPARK-23633-sql-programming-guide.
## What changes were proposed in this pull request?
This PR adds a new collection function: shuffle. It generates a random permutation of the given array. This implementation uses the "inside-out" version of Fisher-Yates algorithm.
## How was this patch tested?
New tests are added to CollectionExpressionsSuite.scala and DataFrameFunctionsSuite.scala.
Author: Takuya UESHIN <ueshin@databricks.com>
Author: pkuwm <ihuizhi.lu@gmail.com>
Closes#21802 from ueshin/issues/SPARK-23928/shuffle.
## What changes were proposed in this pull request?
Add ```sequence``` in functions.py
## How was this patch tested?
Add doctest.
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#21820 from huaxingao/spark-24868.
## What changes were proposed in this pull request?
The PR adds the SQL function `array_union`. The behavior of the function is based on Presto's one.
This function returns returns an array of the elements in the union of array1 and array2.
Note: The order of elements in the result is not defined.
## How was this patch tested?
Added UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21061 from kiszk/SPARK-23914.
## What changes were proposed in this pull request?
Implement map_concat high order function.
This implementation does not pick a winner when the specified maps have overlapping keys. Therefore, this implementation preserves existing duplicate keys in the maps and potentially introduces new duplicates (After discussion with ueshin, we settled on option 1 from [here](https://issues.apache.org/jira/browse/SPARK-23936?focusedCommentId=16464245&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16464245)).
## How was this patch tested?
New tests
Manual tests
Run all sbt SQL tests
Run all pyspark sql tests
Author: Bruce Robbins <bersprockets@gmail.com>
Closes#21073 from bersprockets/SPARK-23936.
## What changes were proposed in this pull request?
This pr supported column arguments in timezone of `from_utc_timestamp/to_utc_timestamp` (follow-up of #21693).
## How was this patch tested?
Added tests.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#21723 from maropu/SPARK-24673-FOLLOWUP.
## What changes were proposed in this pull request?
In the PR, I propose to add new function - *schema_of_json()* which infers schema of JSON string literal. The result of the function is a string containing a schema in DDL format.
One of the use cases is using of *schema_of_json()* in the combination with *from_json()*. Currently, _from_json()_ requires a schema as a mandatory argument. The *schema_of_json()* function will allow to point out an JSON string as an example which has the same schema as the first argument of _from_json()_. For instance:
```sql
select from_json(json_column, schema_of_json('{"c1": [0], "c2": [{"c3":0}]}'))
from json_table;
```
## How was this patch tested?
Added new test to `JsonFunctionsSuite`, `JsonExpressionsSuite` and SQL tests to `json-functions.sql`
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21686 from MaxGekk/infer_schema_json.
## What changes were proposed in this pull request?
Currently, a `pandas_udf` of type `PandasUDFType.GROUPED_MAP` will assign the resulting columns based on index of the return pandas.DataFrame. If a new DataFrame is returned and constructed using a dict, then the order of the columns could be arbitrary and be different than the defined schema for the UDF. If the schema types still match, then no error will be raised and the user will see column names and column data mixed up.
This change will first try to assign columns using the return type field names. If a KeyError occurs, then the column index is checked if it is string based. If so, then the error is raised as it is most likely a naming mistake, else it will fallback to assign columns by position and raise a TypeError if the field types do not match.
## How was this patch tested?
Added a test that returns a new DataFrame with column order different than the schema.
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#21427 from BryanCutler/arrow-grouped-map-mixesup-cols-SPARK-24324.
## What changes were proposed in this pull request?
Add array_distinct to remove duplicate value from the array.
## How was this patch tested?
Add unit tests
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#21050 from huaxingao/spark-23912.
## What changes were proposed in this pull request?
In the PR, I propose to support any DataType represented as DDL string for the from_json function. After the changes, it will be possible to specify `MapType` in SQL like:
```sql
select from_json('{"a":1, "b":2}', 'map<string, int>')
```
and in Scala (similar in other languages)
```scala
val in = Seq("""{"a": {"b": 1}}""").toDS()
val schema = "map<string, map<string, int>>"
val out = in.select(from_json($"value", schema, Map.empty[String, String]))
```
## How was this patch tested?
Added a couple sql tests and modified existing tests for Python and Scala. The former tests were modified because it is not imported for them in which format schema for `from_json` is provided.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21550 from MaxGekk/from_json-ddl-schema.
## What changes were proposed in this pull request?
This PR enables using a grouped aggregate pandas UDFs as window functions. The semantics is the same as using SQL aggregation function as window functions.
```
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> from pyspark.sql import Window
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
>>> pandas_udf("double", PandasUDFType.GROUPED_AGG)
... def mean_udf(v):
... return v.mean()
>>> w = Window.partitionBy('id')
>>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
+---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.5|
| 1| 2.0| 1.5|
| 2| 3.0| 6.0|
| 2| 5.0| 6.0|
| 2|10.0| 6.0|
+---+----+------+
```
The scope of this PR is somewhat limited in terms of:
(1) Only supports unbounded window, which acts essentially as group by.
(2) Only supports aggregation functions, not "transform" like window functions (n -> n mapping)
Both of these are left as future work. Especially, (1) needs careful thinking w.r.t. how to pass rolling window data to python efficiently. (2) is a bit easier but does require more changes therefore I think it's better to leave it as a separate PR.
## How was this patch tested?
WindowPandasUDFTests
Author: Li Jin <ice.xelloss@gmail.com>
Closes#21082 from icexelloss/SPARK-22239-window-udf.
## What changes were proposed in this pull request?
The PR adds the SQL function `map_from_arrays`. The behavior of the function is based on Presto's `map`. Since SparkSQL already had a `map` function, we prepared the different name for this behavior.
This function returns returns a map from a pair of arrays for keys and values.
## How was this patch tested?
Added UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21258 from kiszk/SPARK-23933.
Signed-off-by: DylanGuedes <djmgguedesgmail.com>
## What changes were proposed in this pull request?
Addition of arrays_zip function to spark sql functions.
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
Unit tests that checks if the results are correct.
Author: DylanGuedes <djmgguedes@gmail.com>
Closes#21045 from DylanGuedes/SPARK-23931.
## What changes were proposed in this pull request?
add array_remove to remove all elements that equal element from array
## How was this patch tested?
add unit tests
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#21069 from huaxingao/spark-23920.
## What changes were proposed in this pull request?
Added sections to pandas_udf docs, in the grouped map section, to indicate columns are assigned by position.
## How was this patch tested?
NA
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#21471 from BryanCutler/arrow-doc-pandas_udf-column_by_pos-SPARK-21427.
## What changes were proposed in this pull request?
The pandas_udf functionality was introduced in 2.3.0, but is not completely stable and still evolving. This adds a label to indicate it is still an experimental API.
## How was this patch tested?
NA
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#21435 from BryanCutler/arrow-pandas_udf-experimental-SPARK-24392.
## What changes were proposed in this pull request?
The PR adds the function `arrays_overlap`. This function returns `true` if the input arrays contain a non-null common element; if not, it returns `null` if any of the arrays contains a `null` element, `false` otherwise.
## How was this patch tested?
added UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21028 from mgaido91/SPARK-23922.
## What changes were proposed in this pull request?
The PR adds a new collection function, array_repeat. As there already was a function repeat with the same signature, with the only difference being the expected return type (String instead of Array), the new function is called array_repeat to distinguish.
The behaviour of the function is based on Presto's one.
The function creates an array containing a given element repeated the requested number of times.
## How was this patch tested?
New unit tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite
Author: Florent Pépin <florentpepin.92@gmail.com>
Author: Florent Pépin <florent.pepin14@imperial.ac.uk>
Closes#21208 from pepinoflo/SPARK-23925.
## What changes were proposed in this pull request?
Currently, the from_json function support StructType or ArrayType as the root type. The PR allows to specify MapType(StringType, DataType) as the root type additionally to mentioned types. For example:
```scala
import org.apache.spark.sql.types._
val schema = MapType(StringType, IntegerType)
val in = Seq("""{"a": 1, "b": 2, "c": 3}""").toDS()
in.select(from_json($"value", schema, Map[String, String]())).collect()
```
```
res1: Array[org.apache.spark.sql.Row] = Array([Map(a -> 1, b -> 2, c -> 3)])
```
## How was this patch tested?
It was checked by new tests for the map type with integer type and struct type as value types. Also roundtrip tests like from_json(to_json) and to_json(from_json) for MapType are added.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>
Closes#21108 from MaxGekk/from_json-map-type.
## What changes were proposed in this pull request?
It's useful to know what relationship between date1 and date2 results in a positive number.
Author: aditkumar <aditkumar@gmail.com>
Author: Adit Kumar <aditkumar@gmail.com>
Closes#20787 from aditkumar/master.
## What changes were proposed in this pull request?
I propose to add a clear statement for functions like `collect_list()` about non-deterministic behavior of such functions. The behavior must be taken into account by user while creating and running queries.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21228 from MaxGekk/deterministic-comments.
## What changes were proposed in this pull request?
The PR add the `slice` function. The behavior of the function is based on Presto's one.
The function slices an array according to the requested start index and length.
## How was this patch tested?
added UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21040 from mgaido91/SPARK-23930.
## What changes were proposed in this pull request?
The PR adds the SQL function `array_sort`. The behavior of the function is based on Presto's one.
The function sorts the input array in ascending order. The elements of the input array must be orderable. Null elements will be placed at the end of the returned array.
## How was this patch tested?
Added UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21021 from kiszk/SPARK-23921.
## What changes were proposed in this pull request?
The PR adds the SQL function `array_join`. The behavior of the function is based on Presto's one.
The function accepts an `array` of `string` which is to be joined, a `string` which is the delimiter to use between the items of the first argument and optionally a `string` which is used to replace `null` values.
## How was this patch tested?
added UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21011 from mgaido91/SPARK-23916.
## What changes were proposed in this pull request?
HIVE-15511 introduced the `roundOff` flag in order to disable the rounding to 8 digits which is performed in `months_between`. Since this can be a computational intensive operation, skipping it may improve performances when the rounding is not needed.
## How was this patch tested?
modified existing UT
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21008 from mgaido91/SPARK-23902.
## What changes were proposed in this pull request?
The PR adds the SQL function `element_at`. The behavior of the function is based on Presto's one.
This function returns element of array at given index in value if column is array, or returns value for the given key in value if column is map.
## How was this patch tested?
Added UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21053 from kiszk/SPARK-23924.
## What changes were proposed in this pull request?
The PR adds the SQL function `array_position`. The behavior of the function is based on Presto's one.
The function returns the position of the first occurrence of the element in array x (or 0 if not found) using 1-based index as BigInt.
## How was this patch tested?
Added UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21037 from kiszk/SPARK-23919.
## What changes were proposed in this pull request?
The PR adds the SQL function `array_min`. It takes an array as argument and returns the minimum value in it.
## How was this patch tested?
added UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21025 from mgaido91/SPARK-23918.
## What changes were proposed in this pull request?
The PR adds the SQL function `array_max`. It takes an array as argument and returns the maximum value in it.
## How was this patch tested?
added UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21024 from mgaido91/SPARK-23917.
## What changes were proposed in this pull request?
Column.scala and Functions.scala have asc_nulls_first, asc_nulls_last, desc_nulls_first and desc_nulls_last. Add the corresponding python APIs in column.py and functions.py
## How was this patch tested?
Add doctest
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#20962 from huaxingao/spark-23847.
## What changes were proposed in this pull request?
Add documentation about the limitations of `pandas_udf` with keyword arguments and related concepts, like `functools.partial` fn objects.
NOTE: intermediate commits on this PR show some of the steps that can be taken to fix some (but not all) of these pain points.
### Survey of problems we face today:
(Initialize) Note: python 3.6 and spark 2.4snapshot.
```
from pyspark.sql import SparkSession
import inspect, functools
from pyspark.sql.functions import pandas_udf, PandasUDFType, col, lit, udf
spark = SparkSession.builder.getOrCreate()
print(spark.version)
df = spark.range(1,6).withColumn('b', col('id') * 2)
def ok(a,b): return a+b
```
Using a keyword argument at the call site `b=...` (and yes, *full* stack trace below, haha):
```
---> 14 df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')('id', b='id')).show() # no kwargs
TypeError: wrapper() got an unexpected keyword argument 'b'
```
Using partial with a keyword argument where the kw-arg is the first argument of the fn:
*(Aside: kind of interesting that lines 15,16 work great and then 17 explodes)*
```
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-9-e9f31b8799c1> in <module>()
15 df.withColumn('ok', pandas_udf(f=functools.partial(ok, 7), returnType='bigint')('id')).show()
16 df.withColumn('ok', pandas_udf(f=functools.partial(ok, b=7), returnType='bigint')('id')).show()
---> 17 df.withColumn('ok', pandas_udf(f=functools.partial(ok, a=7), returnType='bigint')('id')).show()
/Users/stu/ZZ/spark/python/pyspark/sql/functions.py in pandas_udf(f, returnType, functionType)
2378 return functools.partial(_create_udf, returnType=return_type, evalType=eval_type)
2379 else:
-> 2380 return _create_udf(f=f, returnType=return_type, evalType=eval_type)
2381
2382
/Users/stu/ZZ/spark/python/pyspark/sql/udf.py in _create_udf(f, returnType, evalType)
54 argspec.varargs is None:
55 raise ValueError(
---> 56 "Invalid function: 0-arg pandas_udfs are not supported. "
57 "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
58 )
ValueError: Invalid function: 0-arg pandas_udfs are not supported. Instead, create a 1-arg pandas_udf and ignore the arg in your function.
```
Author: Michael (Stu) Stewart <mstewart141@gmail.com>
Closes#20900 from mstewart141/udfkw2.
## What changes were proposed in this pull request?
This cleans up unused imports, mainly from pyspark.sql module. Added a note in function.py that imports `UserDefinedFunction` only to maintain backwards compatibility for using `from pyspark.sql.function import UserDefinedFunction`.
## How was this patch tested?
Existing tests and built docs.
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#20892 from BryanCutler/pyspark-cleanup-imports-SPARK-23700.
The exit() builtin is only for interactive use. applications should use sys.exit().
## What changes were proposed in this pull request?
All usage of the builtin `exit()` function is replaced by `sys.exit()`.
## How was this patch tested?
I ran `python/run-tests`.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Benjamin Peterson <benjamin@python.org>
Closes#20682 from benjaminp/sys-exit.
## What changes were proposed in this pull request?
This PR proposes to support an alternative function from with group aggregate pandas UDF.
The current form:
```
def foo(pdf):
return ...
```
Takes a single arg that is a pandas DataFrame.
With this PR, an alternative form is supported:
```
def foo(key, pdf):
return ...
```
The alternative form takes two argument - a tuple that presents the grouping key, and a pandas DataFrame represents the data.
## How was this patch tested?
GroupbyApplyTests
Author: Li Jin <ice.xelloss@gmail.com>
Closes#20295 from icexelloss/SPARK-23011-groupby-apply-key.
## What changes were proposed in this pull request?
Provide more details in trigonometric function documentations. Referenced `java.lang.Math` for further details in the descriptions.
## How was this patch tested?
Ran full build, checked generated documentation manually
Author: Mihaly Toth <misutoth@gmail.com>
Closes#20618 from misutoth/trigonometric-doc.
## What changes were proposed in this pull request?
Added unboundedPreceding(), unboundedFollowing() and currentRow() to PySpark, also updated the rangeBetween API
## How was this patch tested?
did unit test on my local. Please let me know if I need to add unit test in tests.py
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#20400 from huaxingao/spark_23084.
## What changes were proposed in this pull request?
Update the description and tests of three external API or functions `createFunction `, `length` and `repartitionByRange `
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20495 from gatorsmile/updateFunc.
## What changes were proposed in this pull request?
Rename the public APIs and names of pandas udfs.
- `PANDAS SCALAR UDF` -> `SCALAR PANDAS UDF`
- `PANDAS GROUP MAP UDF` -> `GROUPED MAP PANDAS UDF`
- `PANDAS GROUP AGG UDF` -> `GROUPED AGG PANDAS UDF`
## How was this patch tested?
The existing tests
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20428 from gatorsmile/renamePandasUDFs.
## What changes were proposed in this pull request?
Add support for using pandas UDFs with groupby().agg().
This PR introduces a new type of pandas UDF - group aggregate pandas UDF. This type of UDF defines a transformation of multiple pandas Series -> a scalar value. Group aggregate pandas UDFs can be used with groupby().agg(). Note group aggregate pandas UDF doesn't support partial aggregation, i.e., a full shuffle is required.
This PR doesn't support group aggregate pandas UDFs that return ArrayType, StructType or MapType. Support for these types is left for future PR.
## How was this patch tested?
GroupbyAggPandasUDFTests
Author: Li Jin <ice.xelloss@gmail.com>
Closes#19872 from icexelloss/SPARK-22274-groupby-agg.
## What changes were proposed in this pull request?
Currently `UDFRegistration.registerJavaFunction` doesn't support data type string as a `returnType` whereas `UDFRegistration.register`, `udf`, or `pandas_udf` does.
We can support it for `UDFRegistration.registerJavaFunction` as well.
## How was this patch tested?
Added a doctest and existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#20307 from ueshin/issues/SPARK-23141.
## What changes were proposed in this pull request?
This PR proposes to deprecate `register*` for UDFs in `SQLContext` and `Catalog` in Spark 2.3.0.
These are inconsistent with Scala / Java APIs and also these basically do the same things with `spark.udf.register*`.
Also, this PR moves the logcis from `[sqlContext|spark.catalog].register*` to `spark.udf.register*` and reuse the docstring.
This PR also handles minor doc corrections. It also includes https://github.com/apache/spark/pull/20158
## How was this patch tested?
Manually tested, manually checked the API documentation and tests added to check if deprecated APIs call the aliases correctly.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#20288 from HyukjinKwon/deprecate-udf.
## What changes were proposed in this pull request?
The current `Datset.showString` prints rows thru `RowEncoder` deserializers like;
```
scala> Seq(Seq(Seq(1, 2), Seq(3), Seq(4, 5, 6))).toDF("a").show(false)
+------------------------------------------------------------+
|a |
+------------------------------------------------------------+
|[WrappedArray(1, 2), WrappedArray(3), WrappedArray(4, 5, 6)]|
+------------------------------------------------------------+
```
This result is incorrect because the correct one is;
```
scala> Seq(Seq(Seq(1, 2), Seq(3), Seq(4, 5, 6))).toDF("a").show(false)
+------------------------+
|a |
+------------------------+
|[[1, 2], [3], [4, 5, 6]]|
+------------------------+
```
So, this pr fixed code in `showString` to cast field data to strings before printing.
## How was this patch tested?
Added tests in `DataFrameSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#20214 from maropu/SPARK-23023.
## What changes were proposed in this pull request?
This PR proposes to add a note that saying the length of a scalar Pandas UDF's `Series` is not of the whole input column but of the batch.
We are fine for a group map UDF because the usage is different from our typical UDF but scalar UDFs might cause confusion with the normal UDF.
For example, please consider this example:
```python
from pyspark.sql.functions import pandas_udf, col, lit
df = spark.range(1)
f = pandas_udf(lambda x, y: len(x) + y, LongType())
df.select(f(lit('text'), col('id'))).show()
```
```
+------------------+
|<lambda>(text, id)|
+------------------+
| 1|
+------------------+
```
```python
from pyspark.sql.functions import udf, col, lit
df = spark.range(1)
f = udf(lambda x, y: len(x) + y, "long")
df.select(f(lit('text'), col('id'))).show()
```
```
+------------------+
|<lambda>(text, id)|
+------------------+
| 4|
+------------------+
```
## How was this patch tested?
Manually built the doc and checked the output.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#20237 from HyukjinKwon/SPARK-22980.
## What changes were proposed in this pull request?
Add tests for using non deterministic UDFs in aggregate.
Update pandas_udf docstring w.r.t to determinism.
## How was this patch tested?
test_nondeterministic_udf_in_aggregate
Author: Li Jin <ice.xelloss@gmail.com>
Closes#20142 from icexelloss/SPARK-22930-pandas-udf-deterministic.