## What changes were proposed in this pull request?
The PR adds the SQL function `array_join`. The behavior of the function is based on Presto's one.
The function accepts an `array` of `string` which is to be joined, a `string` which is the delimiter to use between the items of the first argument and optionally a `string` which is used to replace `null` values.
## How was this patch tested?
added UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21011 from mgaido91/SPARK-23916.
## What changes were proposed in this pull request?
HIVE-15511 introduced the `roundOff` flag in order to disable the rounding to 8 digits which is performed in `months_between`. Since this can be a computational intensive operation, skipping it may improve performances when the rounding is not needed.
## How was this patch tested?
modified existing UT
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21008 from mgaido91/SPARK-23902.
## What changes were proposed in this pull request?
The PR adds the SQL function `element_at`. The behavior of the function is based on Presto's one.
This function returns element of array at given index in value if column is array, or returns value for the given key in value if column is map.
## How was this patch tested?
Added UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21053 from kiszk/SPARK-23924.
## What changes were proposed in this pull request?
The PR adds the SQL function `array_position`. The behavior of the function is based on Presto's one.
The function returns the position of the first occurrence of the element in array x (or 0 if not found) using 1-based index as BigInt.
## How was this patch tested?
Added UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21037 from kiszk/SPARK-23919.
## What changes were proposed in this pull request?
The PR adds the SQL function `array_min`. It takes an array as argument and returns the minimum value in it.
## How was this patch tested?
added UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21025 from mgaido91/SPARK-23918.
## What changes were proposed in this pull request?
The PR adds the SQL function `array_max`. It takes an array as argument and returns the maximum value in it.
## How was this patch tested?
added UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21024 from mgaido91/SPARK-23917.
## What changes were proposed in this pull request?
Column.scala and Functions.scala have asc_nulls_first, asc_nulls_last, desc_nulls_first and desc_nulls_last. Add the corresponding python APIs in column.py and functions.py
## How was this patch tested?
Add doctest
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#20962 from huaxingao/spark-23847.
## What changes were proposed in this pull request?
Add documentation about the limitations of `pandas_udf` with keyword arguments and related concepts, like `functools.partial` fn objects.
NOTE: intermediate commits on this PR show some of the steps that can be taken to fix some (but not all) of these pain points.
### Survey of problems we face today:
(Initialize) Note: python 3.6 and spark 2.4snapshot.
```
from pyspark.sql import SparkSession
import inspect, functools
from pyspark.sql.functions import pandas_udf, PandasUDFType, col, lit, udf
spark = SparkSession.builder.getOrCreate()
print(spark.version)
df = spark.range(1,6).withColumn('b', col('id') * 2)
def ok(a,b): return a+b
```
Using a keyword argument at the call site `b=...` (and yes, *full* stack trace below, haha):
```
---> 14 df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')('id', b='id')).show() # no kwargs
TypeError: wrapper() got an unexpected keyword argument 'b'
```
Using partial with a keyword argument where the kw-arg is the first argument of the fn:
*(Aside: kind of interesting that lines 15,16 work great and then 17 explodes)*
```
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-9-e9f31b8799c1> in <module>()
15 df.withColumn('ok', pandas_udf(f=functools.partial(ok, 7), returnType='bigint')('id')).show()
16 df.withColumn('ok', pandas_udf(f=functools.partial(ok, b=7), returnType='bigint')('id')).show()
---> 17 df.withColumn('ok', pandas_udf(f=functools.partial(ok, a=7), returnType='bigint')('id')).show()
/Users/stu/ZZ/spark/python/pyspark/sql/functions.py in pandas_udf(f, returnType, functionType)
2378 return functools.partial(_create_udf, returnType=return_type, evalType=eval_type)
2379 else:
-> 2380 return _create_udf(f=f, returnType=return_type, evalType=eval_type)
2381
2382
/Users/stu/ZZ/spark/python/pyspark/sql/udf.py in _create_udf(f, returnType, evalType)
54 argspec.varargs is None:
55 raise ValueError(
---> 56 "Invalid function: 0-arg pandas_udfs are not supported. "
57 "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
58 )
ValueError: Invalid function: 0-arg pandas_udfs are not supported. Instead, create a 1-arg pandas_udf and ignore the arg in your function.
```
Author: Michael (Stu) Stewart <mstewart141@gmail.com>
Closes#20900 from mstewart141/udfkw2.
## What changes were proposed in this pull request?
This cleans up unused imports, mainly from pyspark.sql module. Added a note in function.py that imports `UserDefinedFunction` only to maintain backwards compatibility for using `from pyspark.sql.function import UserDefinedFunction`.
## How was this patch tested?
Existing tests and built docs.
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#20892 from BryanCutler/pyspark-cleanup-imports-SPARK-23700.
The exit() builtin is only for interactive use. applications should use sys.exit().
## What changes were proposed in this pull request?
All usage of the builtin `exit()` function is replaced by `sys.exit()`.
## How was this patch tested?
I ran `python/run-tests`.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Benjamin Peterson <benjamin@python.org>
Closes#20682 from benjaminp/sys-exit.
## What changes were proposed in this pull request?
This PR proposes to support an alternative function from with group aggregate pandas UDF.
The current form:
```
def foo(pdf):
return ...
```
Takes a single arg that is a pandas DataFrame.
With this PR, an alternative form is supported:
```
def foo(key, pdf):
return ...
```
The alternative form takes two argument - a tuple that presents the grouping key, and a pandas DataFrame represents the data.
## How was this patch tested?
GroupbyApplyTests
Author: Li Jin <ice.xelloss@gmail.com>
Closes#20295 from icexelloss/SPARK-23011-groupby-apply-key.
## What changes were proposed in this pull request?
Provide more details in trigonometric function documentations. Referenced `java.lang.Math` for further details in the descriptions.
## How was this patch tested?
Ran full build, checked generated documentation manually
Author: Mihaly Toth <misutoth@gmail.com>
Closes#20618 from misutoth/trigonometric-doc.
## What changes were proposed in this pull request?
Added unboundedPreceding(), unboundedFollowing() and currentRow() to PySpark, also updated the rangeBetween API
## How was this patch tested?
did unit test on my local. Please let me know if I need to add unit test in tests.py
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#20400 from huaxingao/spark_23084.
## What changes were proposed in this pull request?
Update the description and tests of three external API or functions `createFunction `, `length` and `repartitionByRange `
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20495 from gatorsmile/updateFunc.
## What changes were proposed in this pull request?
Rename the public APIs and names of pandas udfs.
- `PANDAS SCALAR UDF` -> `SCALAR PANDAS UDF`
- `PANDAS GROUP MAP UDF` -> `GROUPED MAP PANDAS UDF`
- `PANDAS GROUP AGG UDF` -> `GROUPED AGG PANDAS UDF`
## How was this patch tested?
The existing tests
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20428 from gatorsmile/renamePandasUDFs.
## What changes were proposed in this pull request?
Add support for using pandas UDFs with groupby().agg().
This PR introduces a new type of pandas UDF - group aggregate pandas UDF. This type of UDF defines a transformation of multiple pandas Series -> a scalar value. Group aggregate pandas UDFs can be used with groupby().agg(). Note group aggregate pandas UDF doesn't support partial aggregation, i.e., a full shuffle is required.
This PR doesn't support group aggregate pandas UDFs that return ArrayType, StructType or MapType. Support for these types is left for future PR.
## How was this patch tested?
GroupbyAggPandasUDFTests
Author: Li Jin <ice.xelloss@gmail.com>
Closes#19872 from icexelloss/SPARK-22274-groupby-agg.
## What changes were proposed in this pull request?
Currently `UDFRegistration.registerJavaFunction` doesn't support data type string as a `returnType` whereas `UDFRegistration.register`, `udf`, or `pandas_udf` does.
We can support it for `UDFRegistration.registerJavaFunction` as well.
## How was this patch tested?
Added a doctest and existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#20307 from ueshin/issues/SPARK-23141.
## What changes were proposed in this pull request?
This PR proposes to deprecate `register*` for UDFs in `SQLContext` and `Catalog` in Spark 2.3.0.
These are inconsistent with Scala / Java APIs and also these basically do the same things with `spark.udf.register*`.
Also, this PR moves the logcis from `[sqlContext|spark.catalog].register*` to `spark.udf.register*` and reuse the docstring.
This PR also handles minor doc corrections. It also includes https://github.com/apache/spark/pull/20158
## How was this patch tested?
Manually tested, manually checked the API documentation and tests added to check if deprecated APIs call the aliases correctly.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#20288 from HyukjinKwon/deprecate-udf.
## What changes were proposed in this pull request?
The current `Datset.showString` prints rows thru `RowEncoder` deserializers like;
```
scala> Seq(Seq(Seq(1, 2), Seq(3), Seq(4, 5, 6))).toDF("a").show(false)
+------------------------------------------------------------+
|a |
+------------------------------------------------------------+
|[WrappedArray(1, 2), WrappedArray(3), WrappedArray(4, 5, 6)]|
+------------------------------------------------------------+
```
This result is incorrect because the correct one is;
```
scala> Seq(Seq(Seq(1, 2), Seq(3), Seq(4, 5, 6))).toDF("a").show(false)
+------------------------+
|a |
+------------------------+
|[[1, 2], [3], [4, 5, 6]]|
+------------------------+
```
So, this pr fixed code in `showString` to cast field data to strings before printing.
## How was this patch tested?
Added tests in `DataFrameSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#20214 from maropu/SPARK-23023.
## What changes were proposed in this pull request?
This PR proposes to add a note that saying the length of a scalar Pandas UDF's `Series` is not of the whole input column but of the batch.
We are fine for a group map UDF because the usage is different from our typical UDF but scalar UDFs might cause confusion with the normal UDF.
For example, please consider this example:
```python
from pyspark.sql.functions import pandas_udf, col, lit
df = spark.range(1)
f = pandas_udf(lambda x, y: len(x) + y, LongType())
df.select(f(lit('text'), col('id'))).show()
```
```
+------------------+
|<lambda>(text, id)|
+------------------+
| 1|
+------------------+
```
```python
from pyspark.sql.functions import udf, col, lit
df = spark.range(1)
f = udf(lambda x, y: len(x) + y, "long")
df.select(f(lit('text'), col('id'))).show()
```
```
+------------------+
|<lambda>(text, id)|
+------------------+
| 4|
+------------------+
```
## How was this patch tested?
Manually built the doc and checked the output.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#20237 from HyukjinKwon/SPARK-22980.
## What changes were proposed in this pull request?
Add tests for using non deterministic UDFs in aggregate.
Update pandas_udf docstring w.r.t to determinism.
## How was this patch tested?
test_nondeterministic_udf_in_aggregate
Author: Li Jin <ice.xelloss@gmail.com>
Closes#20142 from icexelloss/SPARK-22930-pandas-udf-deterministic.
## What changes were proposed in this pull request?
This pr modified `concat` to concat binary inputs into a single binary output.
`concat` in the current master always output data as a string. But, in some databases (e.g., PostgreSQL), if all inputs are binary, `concat` also outputs binary.
## How was this patch tested?
Added tests in `SQLQueryTestSuite` and `TypeCoercionSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#19977 from maropu/SPARK-22771.
## What changes were proposed in this pull request?
In SPARK-20586 the flag `deterministic` was added to Scala UDF, but it is not available for python UDF. This flag is useful for cases when the UDF's code can return different result with the same input. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query. This can lead to unexpected behavior.
This PR adds the deterministic flag, via the `asNondeterministic` method, to let the user mark the function as non-deterministic and therefore avoid the optimizations which might lead to strange behaviors.
## How was this patch tested?
Manual tests:
```
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>> df_br = spark.createDataFrame([{'name': 'hello'}])
>>> import random
>>> udf_random_col = udf(lambda: int(100*random.random()), IntegerType()).asNondeterministic()
>>> df_br = df_br.withColumn('RAND', udf_random_col())
>>> random.seed(1234)
>>> udf_add_ten = udf(lambda rand: rand + 10, IntegerType())
>>> df_br.withColumn('RAND_PLUS_TEN', udf_add_ten('RAND')).show()
+-----+----+-------------+
| name|RAND|RAND_PLUS_TEN|
+-----+----+-------------+
|hello| 3| 13|
+-----+----+-------------+
```
Author: Marco Gaido <marcogaido91@gmail.com>
Author: Marco Gaido <mgaido@hortonworks.com>
Closes#19929 from mgaido91/SPARK-22629.
## What changes were proposed in this pull request?
Upgrade Spark to Arrow 0.8.0 for Java and Python. Also includes an upgrade of Netty to 4.1.17 to resolve dependency requirements.
The highlights that pertain to Spark for the update from Arrow versoin 0.4.1 to 0.8.0 include:
* Java refactoring for more simple API
* Java reduced heap usage and streamlined hot code paths
* Type support for DecimalType, ArrayType
* Improved type casting support in Python
* Simplified type checking in Python
## How was this patch tested?
Existing tests
Author: Bryan Cutler <cutlerb@gmail.com>
Author: Shixiong Zhu <zsxwing@gmail.com>
Closes#19884 from BryanCutler/arrow-upgrade-080-SPARK-22324.
## What changes were proposed in this pull request?
Besides conditional expressions such as `when` and `if`, users may want to conditionally execute python udfs by short-curcuit evaluation. We should also explicitly note that python udfs don't support this kind of conditional execution too.
## How was this patch tested?
N/A, just document change.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19787 from viirya/SPARK-22541.
## What changes were proposed in this pull request?
* Add a "function type" argument to pandas_udf.
* Add a new public enum class `PandasUdfType` in pyspark.sql.functions
* Refactor udf related code from pyspark.sql.functions to pyspark.sql.udf
* Merge "PythonUdfType" and "PythonEvalType" into a single enum class "PythonEvalType"
Example:
```
from pyspark.sql.functions import pandas_udf, PandasUDFType
pandas_udf('double', PandasUDFType.SCALAR):
def plus_one(v):
return v + 1
```
## Design doc
https://docs.google.com/document/d/1KlLaa-xJ3oz28xlEJqXyCAHU3dwFYkFs_ixcUXrJNTc/edit
## How was this patch tested?
Added PandasUDFTests
## TODO:
* [x] Implement proper enum type for `PandasUDFType`
* [x] Update documentation
* [x] Add more tests in PandasUDFTests
Author: Li Jin <ice.xelloss@gmail.com>
Closes#19630 from icexelloss/spark-22409-pandas-udf-type.
## What changes were proposed in this pull request?
This PR adds support for a new function called `dayofweek` that returns the day of the week of the given argument as an integer value in the range 1-7, where 1 represents Sunday.
## How was this patch tested?
Unit tests and manual tests.
Author: ptkool <michael.styles@shopify.com>
Closes#19672 from ptkool/day_of_week_function.
## What changes were proposed in this pull request?
Under the current execution mode of Python UDFs, we don't well support Python UDFs as branch values or else value in CaseWhen expression.
Since to fix it might need the change not small (e.g., #19592) and this issue has simpler workaround. We should just notice users in the document about this.
## How was this patch tested?
Only document change.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19617 from viirya/SPARK-22347-3.
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#19535 from HyukjinKwon/deprecated-warning.
## What changes were proposed in this pull request?
This is a follow-up of #18732.
This pr modifies `GroupedData.apply()` method to convert pandas udf to grouped udf implicitly.
## How was this patch tested?
Exisiting tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19517 from ueshin/issues/SPARK-20396/fup2.
## What changes were proposed in this pull request?
This PR adds an apply() function on df.groupby(). apply() takes a pandas udf that is a transformation on `pandas.DataFrame` -> `pandas.DataFrame`.
Static schema
-------------------
```
schema = df.schema
pandas_udf(schema)
def normalize(df):
df = df.assign(v1 = (df.v1 - df.v1.mean()) / df.v1.std()
return df
df.groupBy('id').apply(normalize)
```
Dynamic schema
-----------------------
**This use case is removed from the PR and we will discuss this as a follow up. See discussion https://github.com/apache/spark/pull/18732#pullrequestreview-66583248**
Another example to use pd.DataFrame dtypes as output schema of the udf:
```
sample_df = df.filter(df.id == 1).toPandas()
def foo(df):
ret = # Some transformation on the input pd.DataFrame
return ret
foo_udf = pandas_udf(foo, foo(sample_df).dtypes)
df.groupBy('id').apply(foo_udf)
```
In interactive use case, user usually have a sample pd.DataFrame to test function `foo` in their notebook. Having been able to use `foo(sample_df).dtypes` frees user from specifying the output schema of `foo`.
Design doc: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md
## How was this patch tested?
* Added GroupbyApplyTest
Author: Li Jin <ice.xelloss@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#18732 from icexelloss/groupby-apply-SPARK-20396.
## What changes were proposed in this pull request?
Fixed some minor issues with pandas_udf related docs and formatting.
## How was this patch tested?
NA
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#19375 from BryanCutler/arrow-pandas_udf-cleanup-minor.
## What changes were proposed in this pull request?
This change disables the use of 0-parameter pandas_udfs due to the API being overly complex and awkward, and can easily be worked around by using an index column as an input argument. Also added doctests for pandas_udfs which revealed bugs for handling empty partitions and using the pandas_udf decorator.
## How was this patch tested?
Reworked existing 0-parameter test to verify error is raised, added doctest for pandas_udf, added new tests for empty partition and decorator usage.
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#19325 from BryanCutler/arrow-pandas_udf-0-param-remove-SPARK-22106.
This PR adds vectorized UDFs to the Python API
**Proposed API**
Introduce a flag to turn on vectorization for a defined UDF, for example:
```
pandas_udf(DoubleType())
def plus(a, b)
return a + b
```
or
```
plus = pandas_udf(lambda a, b: a + b, DoubleType())
```
Usage is the same as normal UDFs
0-parameter UDFs
pandas_udf functions can declare an optional `**kwargs` and when evaluated, will contain a key "size" that will give the required length of the output. For example:
```
pandas_udf(LongType())
def f0(**kwargs):
return pd.Series(1).repeat(kwargs["size"])
df.select(f0())
```
Added new unit tests in pyspark.sql that are enabled if pyarrow and Pandas are available.
- [x] Fix support for promoted types with null values
- [ ] Discuss 0-param UDF API (use of kwargs)
- [x] Add tests for chained UDFs
- [ ] Discuss behavior when pyarrow not installed / enabled
- [ ] Cleanup pydoc and add user docs
Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#18659 from BryanCutler/arrow-vectorized-udfs-SPARK-21404.
## What changes were proposed in this pull request?
Clarify behavior of to_utc_timestamp/from_utc_timestamp with an example
## How was this patch tested?
Doc only change / existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19276 from srowen/SPARK-22049.
## What changes were proposed in this pull request?
In previous work SPARK-21513, we has allowed `MapType` and `ArrayType` of `MapType`s convert to a json string but only for Scala API. In this follow-up PR, we will make SparkSQL support it for PySpark and SparkR, too. We also fix some little bugs and comments of the previous work in this follow-up PR.
### For PySpark
```
>>> data = [(1, {"name": "Alice"})]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'{"name":"Alice")']
>>> data = [(1, [{"name": "Alice"}, {"name": "Bob"}])]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'[{"name":"Alice"},{"name":"Bob"}]')]
```
### For SparkR
```
# Converts a map into a JSON object
df2 <- sql("SELECT map('name', 'Bob')) as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
# Converts an array of maps into a JSON array
df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
```
## How was this patch tested?
Add unit test cases.
cc viirya HyukjinKwon
Author: goldmedal <liugs963@gmail.com>
Closes#19223 from goldmedal/SPARK-21513-fp-PySaprkAndSparkR.
## What changes were proposed in this pull request?
Enhanced some existing documentation
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Mac <maclockard@gmail.com>
Closes#18710 from maclockard/maclockard-patch-1.
## What changes were proposed in this pull request?
This PR proposes to avoid `__name__` in the tuple naming the attributes assigned directly from the wrapped function to the wrapper function, and use `self._name` (`func.__name__` or `obj.__class__.name__`).
After SPARK-19161, we happened to break callable objects as UDFs in Python as below:
```python
from pyspark.sql import functions
class F(object):
def __call__(self, x):
return x
foo = F()
udf = functions.udf(foo)
```
```
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark/python/pyspark/sql/functions.py", line 2142, in udf
return _udf(f=f, returnType=returnType)
File ".../spark/python/pyspark/sql/functions.py", line 2133, in _udf
return udf_obj._wrapped()
File ".../spark/python/pyspark/sql/functions.py", line 2090, in _wrapped
functools.wraps(self.func)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/functools.py", line 33, in update_wrapper
setattr(wrapper, attr, getattr(wrapped, attr))
AttributeError: F instance has no attribute '__name__'
```
This worked in Spark 2.1:
```python
from pyspark.sql import functions
class F(object):
def __call__(self, x):
return x
foo = F()
udf = functions.udf(foo)
spark.range(1).select(udf("id")).show()
```
```
+-----+
|F(id)|
+-----+
| 0|
+-----+
```
**After**
```python
from pyspark.sql import functions
class F(object):
def __call__(self, x):
return x
foo = F()
udf = functions.udf(foo)
spark.range(1).select(udf("id")).show()
```
```
+-----+
|F(id)|
+-----+
| 0|
+-----+
```
_In addition, we also happened to break partial functions as below_:
```python
from pyspark.sql import functions
from functools import partial
partial_func = partial(lambda x: x, x=1)
udf = functions.udf(partial_func)
```
```
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark/python/pyspark/sql/functions.py", line 2154, in udf
return _udf(f=f, returnType=returnType)
File ".../spark/python/pyspark/sql/functions.py", line 2145, in _udf
return udf_obj._wrapped()
File ".../spark/python/pyspark/sql/functions.py", line 2099, in _wrapped
functools.wraps(self.func, assigned=assignments)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/functools.py", line 33, in update_wrapper
setattr(wrapper, attr, getattr(wrapped, attr))
AttributeError: 'functools.partial' object has no attribute '__module__'
```
This worked in Spark 2.1:
```python
from pyspark.sql import functions
from functools import partial
partial_func = partial(lambda x: x, x=1)
udf = functions.udf(partial_func)
spark.range(1).select(udf()).show()
```
```
+---------+
|partial()|
+---------+
| 1|
+---------+
```
**After**
```python
from pyspark.sql import functions
from functools import partial
partial_func = partial(lambda x: x, x=1)
udf = functions.udf(partial_func)
spark.range(1).select(udf()).show()
```
```
+---------+
|partial()|
+---------+
| 1|
+---------+
```
## How was this patch tested?
Unit tests in `python/pyspark/sql/tests.py` and manual tests.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#18615 from HyukjinKwon/callable-object.
## What changes were proposed in this pull request?
This PR deals with four points as below:
- Reuse existing DDL parser APIs rather than reimplementing within PySpark
- Support DDL formatted string, `field type, field type`.
- Support case-insensitivity for parsing.
- Support nested data types as below:
**Before**
```
>>> spark.createDataFrame([[[1]]], "struct<a: struct<b: int>>").show()
...
ValueError: The strcut field string format is: 'field_name:field_type', but got: a: struct<b: int>
```
```
>>> spark.createDataFrame([[[1]]], "a: struct<b: int>").show()
...
ValueError: The strcut field string format is: 'field_name:field_type', but got: a: struct<b: int>
```
```
>>> spark.createDataFrame([[1]], "a int").show()
...
ValueError: Could not parse datatype: a int
```
**After**
```
>>> spark.createDataFrame([[[1]]], "struct<a: struct<b: int>>").show()
+---+
| a|
+---+
|[1]|
+---+
```
```
>>> spark.createDataFrame([[[1]]], "a: struct<b: int>").show()
+---+
| a|
+---+
|[1]|
+---+
```
```
>>> spark.createDataFrame([[1]], "a int").show()
+---+
| a|
+---+
| 1|
+---+
```
## How was this patch tested?
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#18590 from HyukjinKwon/deduplicate-python-ddl.
## What changes were proposed in this pull request?
This PR proposes to simply ignore the results in examples that are timezone-dependent in `unix_timestamp` and `from_unixtime`.
```
Failed example:
time_df.select(unix_timestamp('dt', 'yyyy-MM-dd').alias('unix_time')).collect()
Expected:
[Row(unix_time=1428476400)]
Got:unix_timestamp
[Row(unix_time=1428418800)]
```
```
Failed example:
time_df.select(from_unixtime('unix_time').alias('ts')).collect()
Expected:
[Row(ts=u'2015-04-08 00:00:00')]
Got:
[Row(ts=u'2015-04-08 16:00:00')]
```
## How was this patch tested?
Manually tested and `./run-tests --modules pyspark-sql`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#18597 from HyukjinKwon/SPARK-20456.
## What changes were proposed in this pull request?
This PR supports schema in a DDL formatted string for `from_json` in R/Python and `dapply` and `gapply` in R, which are commonly used and/or consistent with Scala APIs.
Additionally, this PR exposes `structType` in R to allow working around in other possible corner cases.
**Python**
`from_json`
```python
from pyspark.sql.functions import from_json
data = [(1, '''{"a": 1}''')]
df = spark.createDataFrame(data, ("key", "value"))
df.select(from_json(df.value, "a INT").alias("json")).show()
```
**R**
`from_json`
```R
df <- sql("SELECT named_struct('name', 'Bob') as people")
df <- mutate(df, people_json = to_json(df$people))
head(select(df, from_json(df$people_json, "name STRING")))
```
`structType.character`
```R
structType("a STRING, b INT")
```
`dapply`
```R
dapply(createDataFrame(list(list(1.0)), "a"), function(x) {x}, "a DOUBLE")
```
`gapply`
```R
gapply(createDataFrame(list(list(1.0)), "a"), "a", function(key, x) { x }, "a DOUBLE")
```
## How was this patch tested?
Doc tests for `from_json` in Python and unit tests `test_sparkSQL.R` in R.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#18498 from HyukjinKwon/SPARK-21266.
## What changes were proposed in this pull request?
This adds documentation to many functions in pyspark.sql.functions.py:
`upper`, `lower`, `reverse`, `unix_timestamp`, `from_unixtime`, `rand`, `randn`, `collect_list`, `collect_set`, `lit`
Add units to the trigonometry functions.
Renames columns in datetime examples to be more informative.
Adds links between some functions.
## How was this patch tested?
`./dev/lint-python`
`python python/pyspark/sql/functions.py`
`./python/run-tests.py --module pyspark-sql`
Author: Michael Patterson <map222@gmail.com>
Closes#17865 from map222/spark-20456.
## What changes were proposed in this pull request?
Add Python wrappers for `o.a.s.sql.functions.explode_outer` and `o.a.s.sql.functions.posexplode_outer`.
## How was this patch tested?
Unit tests, doctests.
Author: zero323 <zero323@users.noreply.github.com>
Closes#18049 from zero323/SPARK-20830.
## What changes were proposed in this pull request?
This fix tries to address the issue in SPARK-19975 where we
have `map_keys` and `map_values` functions in SQL yet there
is no Python equivalent functions.
This fix adds `map_keys` and `map_values` functions to Python.
## How was this patch tested?
This fix is tested manually (See Python docs for examples).
Author: Yong Tang <yong.tang.github@outlook.com>
Closes#17328 from yongtang/SPARK-19975.
## What changes were proposed in this pull request?
This PR proposes three things as below:
- Use casting rules to a timestamp in `to_timestamp` by default (it was `yyyy-MM-dd HH:mm:ss`).
- Support single argument for `to_timestamp` similarly with APIs in other languages.
For example, the one below works
```
import org.apache.spark.sql.functions._
Seq("2016-12-31 00:12:00.00").toDF("a").select(to_timestamp(col("a"))).show()
```
prints
```
+----------------------------------------+
|to_timestamp(`a`, 'yyyy-MM-dd HH:mm:ss')|
+----------------------------------------+
| 2016-12-31 00:12:00|
+----------------------------------------+
```
whereas this does not work in SQL.
**Before**
```
spark-sql> SELECT to_timestamp('2016-12-31 00:12:00');
Error in query: Invalid number of arguments for function to_timestamp; line 1 pos 7
```
**After**
```
spark-sql> SELECT to_timestamp('2016-12-31 00:12:00');
2016-12-31 00:12:00
```
- Related document improvement for SQL function descriptions and other API descriptions accordingly.
**Before**
```
spark-sql> DESCRIBE FUNCTION extended to_date;
...
Usage: to_date(date_str, fmt) - Parses the `left` expression with the `fmt` expression. Returns null with invalid input.
Extended Usage:
Examples:
> SELECT to_date('2016-12-31', 'yyyy-MM-dd');
2016-12-31
```
```
spark-sql> DESCRIBE FUNCTION extended to_timestamp;
...
Usage: to_timestamp(timestamp, fmt) - Parses the `left` expression with the `format` expression to a timestamp. Returns null with invalid input.
Extended Usage:
Examples:
> SELECT to_timestamp('2016-12-31', 'yyyy-MM-dd');
2016-12-31 00:00:00.0
```
**After**
```
spark-sql> DESCRIBE FUNCTION extended to_date;
...
Usage:
to_date(date_str[, fmt]) - Parses the `date_str` expression with the `fmt` expression to
a date. Returns null with invalid input. By default, it follows casting rules to a date if
the `fmt` is omitted.
Extended Usage:
Examples:
> SELECT to_date('2009-07-30 04:17:52');
2009-07-30
> SELECT to_date('2016-12-31', 'yyyy-MM-dd');
2016-12-31
```
```
spark-sql> DESCRIBE FUNCTION extended to_timestamp;
...
Usage:
to_timestamp(timestamp[, fmt]) - Parses the `timestamp` expression with the `fmt` expression to
a timestamp. Returns null with invalid input. By default, it follows casting rules to
a timestamp if the `fmt` is omitted.
Extended Usage:
Examples:
> SELECT to_timestamp('2016-12-31 00:12:00');
2016-12-31 00:12:00
> SELECT to_timestamp('2016-12-31', 'yyyy-MM-dd');
2016-12-31 00:00:00
```
## How was this patch tested?
Added tests in `datetime.sql`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#17901 from HyukjinKwon/to_timestamp_arg.
## What changes were proposed in this pull request?
- Move udf wrapping code from `functions.udf` to `functions.UserDefinedFunction`.
- Return wrapped udf from `catalog.registerFunction` and dependent methods.
- Update docstrings in `catalog.registerFunction` and `SQLContext.registerFunction`.
- Unit tests.
## How was this patch tested?
- Existing unit tests and docstests.
- Additional tests covering new feature.
Author: zero323 <zero323@users.noreply.github.com>
Closes#17831 from zero323/SPARK-18777.