[SPARK-34408][PYTHON] Refactor spark.udf.register to share the same path to generate UDF instance

### What changes were proposed in this pull request?

This PR proposes to use `_create_udf` where we need to create `UserDefinedFunction` to maintain codes easier.

### Why are the changes needed?

For the better readability of codes and maintenance.

### Does this PR introduce _any_ user-facing change?

No, refactoring.

### How was this patch tested?

Ran the existing unittests. CI in this PR should test it out too.

Closes #31537 from HyukjinKwon/SPARK-34408.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
HyukjinKwon 2021-02-11 10:57:02 +09:00
parent cd38287ce2
commit 92a83463c9

View file

@ -36,10 +36,10 @@ def _wrap_function(sc, func, returnType):
sc.pythonVer, broadcast_vars, sc._javaAccumulator)
def _create_udf(f, returnType, evalType):
def _create_udf(f, returnType, evalType, name=None, deterministic=True):
# Set the name of the UserDefinedFunction object to be the name of function f
udf_obj = UserDefinedFunction(
f, returnType=returnType, name=None, evalType=evalType, deterministic=True)
f, returnType=returnType, name=name, evalType=evalType, deterministic=deterministic)
return udf_obj._wrapped()
@ -208,6 +208,7 @@ class UserDefinedFunction(object):
wrapper.deterministic = self.deterministic
wrapper.asNondeterministic = functools.wraps(
self.asNondeterministic)(lambda: self.asNondeterministic()._wrapped())
wrapper._unwrapped = self
return wrapper
def asNondeterministic(self):
@ -348,16 +349,16 @@ class UDFRegistration(object):
"Invalid f: f must be SQL_BATCHED_UDF, SQL_SCALAR_PANDAS_UDF, "
"SQL_SCALAR_PANDAS_ITER_UDF, SQL_GROUPED_AGG_PANDAS_UDF or "
"SQL_MAP_PANDAS_ITER_UDF.")
register_udf = UserDefinedFunction(f.func, returnType=f.returnType, name=name,
evalType=f.evalType,
deterministic=f.deterministic)
register_udf = _create_udf(
f.func, returnType=f.returnType, name=name,
evalType=f.evalType, deterministic=f.deterministic)._unwrapped
return_udf = f
else:
if returnType is None:
returnType = StringType()
register_udf = UserDefinedFunction(f, returnType=returnType, name=name,
evalType=PythonEvalType.SQL_BATCHED_UDF)
return_udf = register_udf._wrapped()
return_udf = _create_udf(
f, returnType=returnType, evalType=PythonEvalType.SQL_BATCHED_UDF, name=name)
register_udf = return_udf._unwrapped
self.sparkSession._jsparkSession.udf().registerPython(name, register_udf._judf)
return return_udf