[SPARK-28056][PYTHON] add doc for SCALAR_ITER Pandas UDF

## What changes were proposed in this pull request?

Add docs for `SCALAR_ITER` Pandas UDF.

cc: WeichenXu123 HyukjinKwon

## How was this patch tested?

Tested example code manually.

Closes #24897 from mengxr/SPARK-28056.

Authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
This commit is contained in:
Xiangrui Meng 2019-06-17 20:51:36 -07:00
parent bb17aec916
commit 1b2448bc10
2 changed files with 103 additions and 0 deletions

View file

@ -86,6 +86,23 @@ The following example shows how to create a scalar Pandas UDF that computes the
</div>
</div>
### Scalar Iterator
Scalar iterator (`SCALAR_ITER`) Pandas UDF is the same as scalar Pandas UDF above except that the
underlying Python function takes an iterator of batches as input instead of a single batch and,
instead of returning a single output batch, it yields output batches or returns an iterator of
output batches.
It is useful when the UDF execution requires initializing some states, e.g., loading an machine
learning model file to apply inference to every input batch.
The following example shows how to create scalar iterator Pandas UDFs:
<div class="codetabs">
<div data-lang="python" markdown="1">
{% include_example scalar_iter_pandas_udf python/sql/arrow.py %}
</div>
</div>
### Grouped Map
Grouped map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern.
Split-apply-combine consists of three steps:

View file

@ -86,6 +86,92 @@ def scalar_pandas_udf_example(spark):
# $example off:scalar_pandas_udf$
def scalar_iter_pandas_udf_example(spark):
# $example on:scalar_iter_pandas_udf$
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, struct, PandasUDFType
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# When the UDF is called with a single column that is not StructType,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long", PandasUDFType.SCALAR_ITER)
def plus_one(batch_iter):
for x in batch_iter:
yield x + 1
df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
# When the UDF is called with more than one columns,
# the input to the underlying function is an iterator of pd.Series tuple.
@pandas_udf("long", PandasUDFType.SCALAR_ITER)
def multiply_two_cols(batch_iter):
for a, b in batch_iter:
yield a * b
df.select(multiply_two_cols(col("x"), col("x"))).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
# When the UDF is called with a single column that is StructType,
# the input to the underlying function is an iterator of pd.DataFrame.
@pandas_udf("long", PandasUDFType.SCALAR_ITER)
def multiply_two_nested_cols(pdf_iter):
for pdf in pdf_iter:
yield pdf["a"] * pdf["b"]
df.select(
multiply_two_nested_cols(
struct(col("x").alias("a"), col("x").alias("b"))
).alias("y")
).show()
# +---+
# | y|
# +---+
# | 1|
# | 4|
# | 9|
# +---+
# In the UDF, you can initialize some states before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)
@pandas_udf("long", PandasUDFType.SCALAR_ITER)
def plus_y(batch_iter):
y = y_bc.value # initialize states
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any
df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+
# $example off:scalar_iter_pandas_udf$
def grouped_map_pandas_udf_example(spark):
# $example on:grouped_map_pandas_udf$
from pyspark.sql.functions import pandas_udf, PandasUDFType