26998b86c1
## What changes were proposed in this pull request? This PR is an alternative approach for #24734. This PR fixes two things: 1. Respects `spark.buffer.size` in Python workers. 2. Adds a runtime buffer size configuration for Pandas UDFs, `spark.sql.pandas.udf.buffer.size` (which falls back to `spark.buffer.size`. ## How was this patch tested? Manually tested: ```python import time from pyspark.sql.functions import * spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1') df = spark.range(1, 31, numPartitions=1).select(col('id').alias('a')) pandas_udf("int", PandasUDFType.SCALAR) def fp1(x): print("run fp1") time.sleep(1) return x + 100 pandas_udf("int", PandasUDFType.SCALAR) def fp2(x, y): print("run fp2") time.sleep(1) return x + y beg_time = time.time() result = df.select(sum(fp2(fp1('a'), col('a')))).head() print("result: " + str(result[0])) print("consume time: " + str(time.time() - beg_time)) ``` ``` consume time: 62.68265891075134 ``` ```python import time from pyspark.sql.functions import * spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1') spark.conf.set('spark.sql.pandas.udf.buffer.size', '4') df = spark.range(1, 31, numPartitions=1).select(col('id').alias('a')) pandas_udf("int", PandasUDFType.SCALAR) def fp1(x): print("run fp1") time.sleep(1) return x + 100 pandas_udf("int", PandasUDFType.SCALAR) def fp2(x, y): print("run fp2") time.sleep(1) return x + y beg_time = time.time() result = df.select(sum(fp2(fp1('a'), col('a')))).head() print("result: " + str(result[0])) print("consume time: " + str(time.time() - beg_time)) ``` ``` consume time: 34.00594782829285 ``` Closes #24826 from HyukjinKwon/SPARK-27870. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
---|---|---|
.. | ||
benchmarks | ||
src | ||
pom.xml |