7de33f56e8
### What changes were proposed in this pull request? This PR add Python API for invoking following higher functions: - `transform` - `exists` - `forall` - `filter` - `aggregate` - `zip_with` - `transform_keys` - `transform_values` - `map_filter` - `map_zip_with` to `pyspark.sql`. Each of these accepts plain Python functions of one of the following types - `(Column) -> Column: ...` - `(Column, Column) -> Column: ...` - `(Column, Column, Column) -> Column: ...` Internally this proposal piggbacks on objects supporting Scala implementation ([SPARK-27297](https://issues.apache.org/jira/browse/SPARK-27297)) by: 1. Creating required `UnresolvedNamedLambdaVariables` exposing these as PySpark `Columns` 2. Invoking Python function with these columns as arguments. 3. Using the result, and underlying JVM objects from 1., to create `expressions.LambdaFunction` which is passed to desired expression, and repacked as Python `Column`. ### Why are the changes needed? Currently higher order functions are available only using SQL and Scala API and can use only SQL expressions ```python df.selectExpr("transform(values, x -> x + 1)") ``` This works reasonably well for simple functions, but can get really ugly with complex functions (complex functions, casts), resulting objects are somewhat verbose and we don't get any IDE support. Additionally DSL used, though very simple, is not documented. With changes propose here, above query could be rewritten as: ```python df.select(transform("values", lambda x: x + 1)) ``` ### Does this PR introduce any user-facing change? No. ### How was this patch tested? - For positive cases this PR adds doctest strings covering possible usage patterns. - For negative cases (unsupported function types) this PR adds unit tests. ### Notes If approved, the same approach can be used in SparkR. Closes #27406 from zero323/SPARK-30681. Authored-by: zero323 <mszymkiewicz@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
---|---|---|
.. | ||
__init__.py | ||
test_arrow.py | ||
test_catalog.py | ||
test_column.py | ||
test_conf.py | ||
test_context.py | ||
test_dataframe.py | ||
test_datasources.py | ||
test_functions.py | ||
test_group.py | ||
test_pandas_cogrouped_map.py | ||
test_pandas_grouped_map.py | ||
test_pandas_map.py | ||
test_pandas_udf.py | ||
test_pandas_udf_grouped_agg.py | ||
test_pandas_udf_scalar.py | ||
test_pandas_udf_typehints.py | ||
test_pandas_udf_window.py | ||
test_readwriter.py | ||
test_serde.py | ||
test_session.py | ||
test_streaming.py | ||
test_types.py | ||
test_udf.py | ||
test_utils.py |