9786ce66c5
## What changes were proposed in this pull request? This PR enables using a grouped aggregate pandas UDFs as window functions. The semantics is the same as using SQL aggregation function as window functions. ``` >>> from pyspark.sql.functions import pandas_udf, PandasUDFType >>> from pyspark.sql import Window >>> df = spark.createDataFrame( ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ... ("id", "v")) >>> pandas_udf("double", PandasUDFType.GROUPED_AGG) ... def mean_udf(v): ... return v.mean() >>> w = Window.partitionBy('id') >>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show() +---+----+------+ | id| v|mean_v| +---+----+------+ | 1| 1.0| 1.5| | 1| 2.0| 1.5| | 2| 3.0| 6.0| | 2| 5.0| 6.0| | 2|10.0| 6.0| +---+----+------+ ``` The scope of this PR is somewhat limited in terms of: (1) Only supports unbounded window, which acts essentially as group by. (2) Only supports aggregation functions, not "transform" like window functions (n -> n mapping) Both of these are left as future work. Especially, (1) needs careful thinking w.r.t. how to pass rolling window data to python efficiently. (2) is a bit easier but does require more changes therefore I think it's better to leave it as a separate PR. ## How was this patch tested? WindowPandasUDFTests Author: Li Jin <ice.xelloss@gmail.com> Closes #21082 from icexelloss/SPARK-22239-window-udf.
341 lines
13 KiB
Python
341 lines
13 KiB
Python
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
"""
|
|
Worker that receives input from Piped RDD.
|
|
"""
|
|
from __future__ import print_function
|
|
import os
|
|
import sys
|
|
import time
|
|
import socket
|
|
import traceback
|
|
|
|
from pyspark.accumulators import _accumulatorRegistry
|
|
from pyspark.broadcast import Broadcast, _broadcastRegistry
|
|
from pyspark.java_gateway import do_server_auth
|
|
from pyspark.taskcontext import TaskContext
|
|
from pyspark.files import SparkFiles
|
|
from pyspark.rdd import PythonEvalType
|
|
from pyspark.serializers import write_with_length, write_int, read_long, \
|
|
write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, \
|
|
BatchedSerializer, ArrowStreamPandasSerializer
|
|
from pyspark.sql.types import to_arrow_type
|
|
from pyspark.util import _get_argspec, fail_on_stopiteration
|
|
from pyspark import shuffle
|
|
|
|
pickleSer = PickleSerializer()
|
|
utf8_deserializer = UTF8Deserializer()
|
|
|
|
|
|
def report_times(outfile, boot, init, finish):
|
|
write_int(SpecialLengths.TIMING_DATA, outfile)
|
|
write_long(int(1000 * boot), outfile)
|
|
write_long(int(1000 * init), outfile)
|
|
write_long(int(1000 * finish), outfile)
|
|
|
|
|
|
def add_path(path):
|
|
# worker can be used, so donot add path multiple times
|
|
if path not in sys.path:
|
|
# overwrite system packages
|
|
sys.path.insert(1, path)
|
|
|
|
|
|
def read_command(serializer, file):
|
|
command = serializer._read_with_length(file)
|
|
if isinstance(command, Broadcast):
|
|
command = serializer.loads(command.value)
|
|
return command
|
|
|
|
|
|
def chain(f, g):
|
|
"""chain two functions together """
|
|
return lambda *a: g(f(*a))
|
|
|
|
|
|
def wrap_udf(f, return_type):
|
|
if return_type.needConversion():
|
|
toInternal = return_type.toInternal
|
|
return lambda *a: toInternal(f(*a))
|
|
else:
|
|
return lambda *a: f(*a)
|
|
|
|
|
|
def wrap_scalar_pandas_udf(f, return_type):
|
|
arrow_return_type = to_arrow_type(return_type)
|
|
|
|
def verify_result_length(*a):
|
|
result = f(*a)
|
|
if not hasattr(result, "__len__"):
|
|
raise TypeError("Return type of the user-defined function should be "
|
|
"Pandas.Series, but is {}".format(type(result)))
|
|
if len(result) != len(a[0]):
|
|
raise RuntimeError("Result vector from pandas_udf was not the required length: "
|
|
"expected %d, got %d" % (len(a[0]), len(result)))
|
|
return result
|
|
|
|
return lambda *a: (verify_result_length(*a), arrow_return_type)
|
|
|
|
|
|
def wrap_grouped_map_pandas_udf(f, return_type, argspec):
|
|
def wrapped(key_series, value_series):
|
|
import pandas as pd
|
|
|
|
if len(argspec.args) == 1:
|
|
result = f(pd.concat(value_series, axis=1))
|
|
elif len(argspec.args) == 2:
|
|
key = tuple(s[0] for s in key_series)
|
|
result = f(key, pd.concat(value_series, axis=1))
|
|
|
|
if not isinstance(result, pd.DataFrame):
|
|
raise TypeError("Return type of the user-defined function should be "
|
|
"pandas.DataFrame, but is {}".format(type(result)))
|
|
if not len(result.columns) == len(return_type):
|
|
raise RuntimeError(
|
|
"Number of columns of the returned pandas.DataFrame "
|
|
"doesn't match specified schema. "
|
|
"Expected: {} Actual: {}".format(len(return_type), len(result.columns)))
|
|
arrow_return_types = (to_arrow_type(field.dataType) for field in return_type)
|
|
return [(result[result.columns[i]], arrow_type)
|
|
for i, arrow_type in enumerate(arrow_return_types)]
|
|
|
|
return wrapped
|
|
|
|
|
|
def wrap_grouped_agg_pandas_udf(f, return_type):
|
|
arrow_return_type = to_arrow_type(return_type)
|
|
|
|
def wrapped(*series):
|
|
import pandas as pd
|
|
result = f(*series)
|
|
return pd.Series([result])
|
|
|
|
return lambda *a: (wrapped(*a), arrow_return_type)
|
|
|
|
|
|
def wrap_window_agg_pandas_udf(f, return_type):
|
|
# This is similar to grouped_agg_pandas_udf, the only difference
|
|
# is that window_agg_pandas_udf needs to repeat the return value
|
|
# to match window length, where grouped_agg_pandas_udf just returns
|
|
# the scalar value.
|
|
arrow_return_type = to_arrow_type(return_type)
|
|
|
|
def wrapped(*series):
|
|
import pandas as pd
|
|
result = f(*series)
|
|
return pd.Series([result]).repeat(len(series[0]))
|
|
|
|
return lambda *a: (wrapped(*a), arrow_return_type)
|
|
|
|
|
|
def read_single_udf(pickleSer, infile, eval_type):
|
|
num_arg = read_int(infile)
|
|
arg_offsets = [read_int(infile) for i in range(num_arg)]
|
|
row_func = None
|
|
for i in range(read_int(infile)):
|
|
f, return_type = read_command(pickleSer, infile)
|
|
if row_func is None:
|
|
row_func = f
|
|
else:
|
|
row_func = chain(row_func, f)
|
|
|
|
# make sure StopIteration's raised in the user code are not ignored
|
|
# when they are processed in a for loop, raise them as RuntimeError's instead
|
|
func = fail_on_stopiteration(row_func)
|
|
|
|
# the last returnType will be the return type of UDF
|
|
if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
|
|
return arg_offsets, wrap_scalar_pandas_udf(func, return_type)
|
|
elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
|
|
argspec = _get_argspec(row_func) # signature was lost when wrapping it
|
|
return arg_offsets, wrap_grouped_map_pandas_udf(func, return_type, argspec)
|
|
elif eval_type == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF:
|
|
return arg_offsets, wrap_grouped_agg_pandas_udf(func, return_type)
|
|
elif eval_type == PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF:
|
|
return arg_offsets, wrap_window_agg_pandas_udf(func, return_type)
|
|
elif eval_type == PythonEvalType.SQL_BATCHED_UDF:
|
|
return arg_offsets, wrap_udf(func, return_type)
|
|
else:
|
|
raise ValueError("Unknown eval type: {}".format(eval_type))
|
|
|
|
|
|
def read_udfs(pickleSer, infile, eval_type):
|
|
num_udfs = read_int(infile)
|
|
udfs = {}
|
|
call_udf = []
|
|
mapper_str = ""
|
|
if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
|
|
# Create function like this:
|
|
# lambda a: f([a[0]], [a[0], a[1]])
|
|
|
|
# We assume there is only one UDF here because grouped map doesn't
|
|
# support combining multiple UDFs.
|
|
assert num_udfs == 1
|
|
|
|
# See FlatMapGroupsInPandasExec for how arg_offsets are used to
|
|
# distinguish between grouping attributes and data attributes
|
|
arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type)
|
|
udfs['f'] = udf
|
|
split_offset = arg_offsets[0] + 1
|
|
arg0 = ["a[%d]" % o for o in arg_offsets[1: split_offset]]
|
|
arg1 = ["a[%d]" % o for o in arg_offsets[split_offset:]]
|
|
mapper_str = "lambda a: f([%s], [%s])" % (", ".join(arg0), ", ".join(arg1))
|
|
else:
|
|
# Create function like this:
|
|
# lambda a: (f0(a[0]), f1(a[1], a[2]), f2(a[3]))
|
|
# In the special case of a single UDF this will return a single result rather
|
|
# than a tuple of results; this is the format that the JVM side expects.
|
|
for i in range(num_udfs):
|
|
arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type)
|
|
udfs['f%d' % i] = udf
|
|
args = ["a[%d]" % o for o in arg_offsets]
|
|
call_udf.append("f%d(%s)" % (i, ", ".join(args)))
|
|
mapper_str = "lambda a: (%s)" % (", ".join(call_udf))
|
|
|
|
mapper = eval(mapper_str, udfs)
|
|
func = lambda _, it: map(mapper, it)
|
|
|
|
if eval_type in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
|
|
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
|
|
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
|
|
PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF):
|
|
timezone = utf8_deserializer.loads(infile)
|
|
ser = ArrowStreamPandasSerializer(timezone)
|
|
else:
|
|
ser = BatchedSerializer(PickleSerializer(), 100)
|
|
|
|
# profiling is not supported for UDF
|
|
return func, None, ser, ser
|
|
|
|
|
|
def main(infile, outfile):
|
|
try:
|
|
boot_time = time.time()
|
|
split_index = read_int(infile)
|
|
if split_index == -1: # for unit tests
|
|
sys.exit(-1)
|
|
|
|
version = utf8_deserializer.loads(infile)
|
|
if version != "%d.%d" % sys.version_info[:2]:
|
|
raise Exception(("Python in worker has different version %s than that in " +
|
|
"driver %s, PySpark cannot run with different minor versions." +
|
|
"Please check environment variables PYSPARK_PYTHON and " +
|
|
"PYSPARK_DRIVER_PYTHON are correctly set.") %
|
|
("%d.%d" % sys.version_info[:2], version))
|
|
|
|
# initialize global state
|
|
taskContext = TaskContext._getOrCreate()
|
|
taskContext._stageId = read_int(infile)
|
|
taskContext._partitionId = read_int(infile)
|
|
taskContext._attemptNumber = read_int(infile)
|
|
taskContext._taskAttemptId = read_long(infile)
|
|
taskContext._localProperties = dict()
|
|
for i in range(read_int(infile)):
|
|
k = utf8_deserializer.loads(infile)
|
|
v = utf8_deserializer.loads(infile)
|
|
taskContext._localProperties[k] = v
|
|
|
|
shuffle.MemoryBytesSpilled = 0
|
|
shuffle.DiskBytesSpilled = 0
|
|
_accumulatorRegistry.clear()
|
|
|
|
# fetch name of workdir
|
|
spark_files_dir = utf8_deserializer.loads(infile)
|
|
SparkFiles._root_directory = spark_files_dir
|
|
SparkFiles._is_running_on_worker = True
|
|
|
|
# fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
|
|
add_path(spark_files_dir) # *.py files that were added will be copied here
|
|
num_python_includes = read_int(infile)
|
|
for _ in range(num_python_includes):
|
|
filename = utf8_deserializer.loads(infile)
|
|
add_path(os.path.join(spark_files_dir, filename))
|
|
if sys.version > '3':
|
|
import importlib
|
|
importlib.invalidate_caches()
|
|
|
|
# fetch names and values of broadcast variables
|
|
num_broadcast_variables = read_int(infile)
|
|
for _ in range(num_broadcast_variables):
|
|
bid = read_long(infile)
|
|
if bid >= 0:
|
|
path = utf8_deserializer.loads(infile)
|
|
_broadcastRegistry[bid] = Broadcast(path=path)
|
|
else:
|
|
bid = - bid - 1
|
|
_broadcastRegistry.pop(bid)
|
|
|
|
_accumulatorRegistry.clear()
|
|
eval_type = read_int(infile)
|
|
if eval_type == PythonEvalType.NON_UDF:
|
|
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
|
|
else:
|
|
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
|
|
|
|
init_time = time.time()
|
|
|
|
def process():
|
|
iterator = deserializer.load_stream(infile)
|
|
serializer.dump_stream(func(split_index, iterator), outfile)
|
|
|
|
if profiler:
|
|
profiler.profile(process)
|
|
else:
|
|
process()
|
|
except Exception:
|
|
try:
|
|
write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
|
|
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
|
|
except IOError:
|
|
# JVM close the socket
|
|
pass
|
|
except Exception:
|
|
# Write the error to stderr if it happened while serializing
|
|
print("PySpark worker failed with exception:", file=sys.stderr)
|
|
print(traceback.format_exc(), file=sys.stderr)
|
|
sys.exit(-1)
|
|
finish_time = time.time()
|
|
report_times(outfile, boot_time, init_time, finish_time)
|
|
write_long(shuffle.MemoryBytesSpilled, outfile)
|
|
write_long(shuffle.DiskBytesSpilled, outfile)
|
|
|
|
# Mark the beginning of the accumulators section of the output
|
|
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
|
|
write_int(len(_accumulatorRegistry), outfile)
|
|
for (aid, accum) in _accumulatorRegistry.items():
|
|
pickleSer._write_with_length((aid, accum._value), outfile)
|
|
|
|
# check end of stream
|
|
if read_int(infile) == SpecialLengths.END_OF_STREAM:
|
|
write_int(SpecialLengths.END_OF_STREAM, outfile)
|
|
else:
|
|
# write a different value to tell JVM to not reuse this worker
|
|
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
|
|
sys.exit(-1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Read information about how to connect back to the JVM from the environment.
|
|
java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
|
|
auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"]
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.connect(("127.0.0.1", java_port))
|
|
sock_file = sock.makefile("rwb", 65536)
|
|
do_server_auth(sock_file, auth_secret)
|
|
main(sock_file, sock_file)
|