From 92a83463c9419eef5c22b9cb382e0ef6a77fe35d Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Thu, 11 Feb 2021 10:57:02 +0900 Subject: [PATCH] [SPARK-34408][PYTHON] Refactor spark.udf.register to share the same path to generate UDF instance ### What changes were proposed in this pull request? This PR proposes to use `_create_udf` where we need to create `UserDefinedFunction` to maintain codes easier. ### Why are the changes needed? For the better readability of codes and maintenance. ### Does this PR introduce _any_ user-facing change? No, refactoring. ### How was this patch tested? Ran the existing unittests. CI in this PR should test it out too. Closes #31537 from HyukjinKwon/SPARK-34408. Authored-by: HyukjinKwon Signed-off-by: HyukjinKwon --- python/pyspark/sql/udf.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index c2e02a1c8c..e20b9a1096 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -36,10 +36,10 @@ def _wrap_function(sc, func, returnType): sc.pythonVer, broadcast_vars, sc._javaAccumulator) -def _create_udf(f, returnType, evalType): +def _create_udf(f, returnType, evalType, name=None, deterministic=True): # Set the name of the UserDefinedFunction object to be the name of function f udf_obj = UserDefinedFunction( - f, returnType=returnType, name=None, evalType=evalType, deterministic=True) + f, returnType=returnType, name=name, evalType=evalType, deterministic=deterministic) return udf_obj._wrapped() @@ -208,6 +208,7 @@ class UserDefinedFunction(object): wrapper.deterministic = self.deterministic wrapper.asNondeterministic = functools.wraps( self.asNondeterministic)(lambda: self.asNondeterministic()._wrapped()) + wrapper._unwrapped = self return wrapper def asNondeterministic(self): @@ -348,16 +349,16 @@ class UDFRegistration(object): "Invalid f: f must be SQL_BATCHED_UDF, SQL_SCALAR_PANDAS_UDF, " "SQL_SCALAR_PANDAS_ITER_UDF, SQL_GROUPED_AGG_PANDAS_UDF or " "SQL_MAP_PANDAS_ITER_UDF.") - register_udf = UserDefinedFunction(f.func, returnType=f.returnType, name=name, - evalType=f.evalType, - deterministic=f.deterministic) + register_udf = _create_udf( + f.func, returnType=f.returnType, name=name, + evalType=f.evalType, deterministic=f.deterministic)._unwrapped return_udf = f else: if returnType is None: returnType = StringType() - register_udf = UserDefinedFunction(f, returnType=returnType, name=name, - evalType=PythonEvalType.SQL_BATCHED_UDF) - return_udf = register_udf._wrapped() + return_udf = _create_udf( + f, returnType=returnType, evalType=PythonEvalType.SQL_BATCHED_UDF, name=name) + register_udf = return_udf._unwrapped self.sparkSession._jsparkSession.udf().registerPython(name, register_udf._judf) return return_udf