2013-07-16 20:21:33 -04:00
|
|
|
#
|
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
|
|
# this work for additional information regarding copyright ownership.
|
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
# (the "License"); you may not use this file except in compliance with
|
|
|
|
# the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
#
|
|
|
|
|
2012-08-10 04:10:02 -04:00
|
|
|
"""
|
|
|
|
Worker that receives input from Piped RDD.
|
|
|
|
"""
|
2015-04-16 19:20:57 -04:00
|
|
|
from __future__ import print_function
|
2013-02-01 03:25:19 -05:00
|
|
|
import os
|
2012-08-10 04:10:02 -04:00
|
|
|
import sys
|
2013-05-06 19:34:30 -04:00
|
|
|
import time
|
2013-09-01 21:06:15 -04:00
|
|
|
import socket
|
2013-01-31 21:02:28 -05:00
|
|
|
import traceback
|
2014-09-14 01:31:21 -04:00
|
|
|
|
2013-01-20 04:57:44 -05:00
|
|
|
from pyspark.accumulators import _accumulatorRegistry
|
2012-08-25 16:59:01 -04:00
|
|
|
from pyspark.broadcast import Broadcast, _broadcastRegistry
|
2016-12-20 18:51:21 -05:00
|
|
|
from pyspark.taskcontext import TaskContext
|
2013-01-21 19:42:24 -05:00
|
|
|
from pyspark.files import SparkFiles
|
2013-11-05 20:52:39 -05:00
|
|
|
from pyspark.serializers import write_with_length, write_int, read_long, \
|
[SPARK-14267] [SQL] [PYSPARK] execute multiple Python UDFs within single batch
## What changes were proposed in this pull request?
This PR support multiple Python UDFs within single batch, also improve the performance.
```python
>>> from pyspark.sql.types import IntegerType
>>> sqlContext.registerFunction("double", lambda x: x * 2, IntegerType())
>>> sqlContext.registerFunction("add", lambda x, y: x + y, IntegerType())
>>> sqlContext.sql("SELECT double(add(1, 2)), add(double(2), 1)").explain(True)
== Parsed Logical Plan ==
'Project [unresolvedalias('double('add(1, 2)), None),unresolvedalias('add('double(2), 1), None)]
+- OneRowRelation$
== Analyzed Logical Plan ==
double(add(1, 2)): int, add(double(2), 1): int
Project [double(add(1, 2))#14,add(double(2), 1)#15]
+- Project [double(add(1, 2))#14,add(double(2), 1)#15]
+- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
+- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
+- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
+- OneRowRelation$
== Optimized Logical Plan ==
Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
+- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
+- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
+- OneRowRelation$
== Physical Plan ==
WholeStageCodegen
: +- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
: +- INPUT
+- !BatchPythonEvaluation [add(pythonUDF1#17, 1)], [pythonUDF0#16,pythonUDF1#17,pythonUDF0#18]
+- !BatchPythonEvaluation [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
+- Scan OneRowRelation[]
```
## How was this patch tested?
Added new tests.
Using the following script to benchmark 1, 2 and 3 udfs,
```
df = sqlContext.range(1, 1 << 23, 1, 4)
double = F.udf(lambda x: x * 2, LongType())
print df.select(double(df.id)).count()
print df.select(double(df.id), double(df.id + 1)).count()
print df.select(double(df.id), double(df.id + 1), double(df.id + 2)).count()
```
Here is the results:
N | Before | After | speed up
---- |------------ | -------------|------
1 | 22 s | 7 s | 3.1X
2 | 38 s | 13 s | 2.9X
3 | 58 s | 16 s | 3.6X
This benchmark ran locally with 4 CPUs. For 3 UDFs, it launched 12 Python before before this patch, 4 process after this patch. After this patch, it will use less memory for multiple UDFs than before (less buffering).
Author: Davies Liu <davies@databricks.com>
Closes #12057 from davies/multi_udfs.
2016-03-31 19:40:20 -04:00
|
|
|
write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, BatchedSerializer
|
2014-09-14 01:31:21 -04:00
|
|
|
from pyspark import shuffle
|
2013-11-05 20:52:39 -05:00
|
|
|
|
|
|
|
pickleSer = PickleSerializer()
|
2014-01-28 22:50:26 -05:00
|
|
|
utf8_deserializer = UTF8Deserializer()
|
2012-08-25 19:46:07 -04:00
|
|
|
|
2012-08-10 04:10:02 -04:00
|
|
|
|
2013-05-06 19:34:30 -04:00
|
|
|
def report_times(outfile, boot, init, finish):
|
2013-11-03 00:13:18 -04:00
|
|
|
write_int(SpecialLengths.TIMING_DATA, outfile)
|
2015-04-16 19:20:57 -04:00
|
|
|
write_long(int(1000 * boot), outfile)
|
|
|
|
write_long(int(1000 * init), outfile)
|
|
|
|
write_long(int(1000 * finish), outfile)
|
2013-03-10 16:54:46 -04:00
|
|
|
|
|
|
|
|
2014-09-24 15:10:09 -04:00
|
|
|
def add_path(path):
|
|
|
|
# worker can be used, so donot add path multiple times
|
|
|
|
if path not in sys.path:
|
|
|
|
# overwrite system packages
|
|
|
|
sys.path.insert(1, path)
|
|
|
|
|
|
|
|
|
[SPARK-14215] [SQL] [PYSPARK] Support chained Python UDFs
## What changes were proposed in this pull request?
This PR brings the support for chained Python UDFs, for example
```sql
select udf1(udf2(a))
select udf1(udf2(a) + 3)
select udf1(udf2(a) + udf3(b))
```
Also directly chained unary Python UDFs are put in single batch of Python UDFs, others may require multiple batches.
For example,
```python
>>> sqlContext.sql("select double(double(1))").explain()
== Physical Plan ==
WholeStageCodegen
: +- Project [pythonUDF#10 AS double(double(1))#9]
: +- INPUT
+- !BatchPythonEvaluation double(double(1)), [pythonUDF#10]
+- Scan OneRowRelation[]
>>> sqlContext.sql("select double(double(1) + double(2))").explain()
== Physical Plan ==
WholeStageCodegen
: +- Project [pythonUDF#19 AS double((double(1) + double(2)))#16]
: +- INPUT
+- !BatchPythonEvaluation double((pythonUDF#17 + pythonUDF#18)), [pythonUDF#17,pythonUDF#18,pythonUDF#19]
+- !BatchPythonEvaluation double(2), [pythonUDF#17,pythonUDF#18]
+- !BatchPythonEvaluation double(1), [pythonUDF#17]
+- Scan OneRowRelation[]
```
TODO: will support multiple unrelated Python UDFs in one batch (another PR).
## How was this patch tested?
Added new unit tests for chained UDFs.
Author: Davies Liu <davies@databricks.com>
Closes #12014 from davies/py_udfs.
2016-03-29 18:06:29 -04:00
|
|
|
def read_command(serializer, file):
|
|
|
|
command = serializer._read_with_length(file)
|
|
|
|
if isinstance(command, Broadcast):
|
|
|
|
command = serializer.loads(command.value)
|
|
|
|
return command
|
|
|
|
|
|
|
|
|
|
|
|
def chain(f, g):
|
|
|
|
"""chain two function together """
|
[SPARK-14267] [SQL] [PYSPARK] execute multiple Python UDFs within single batch
## What changes were proposed in this pull request?
This PR support multiple Python UDFs within single batch, also improve the performance.
```python
>>> from pyspark.sql.types import IntegerType
>>> sqlContext.registerFunction("double", lambda x: x * 2, IntegerType())
>>> sqlContext.registerFunction("add", lambda x, y: x + y, IntegerType())
>>> sqlContext.sql("SELECT double(add(1, 2)), add(double(2), 1)").explain(True)
== Parsed Logical Plan ==
'Project [unresolvedalias('double('add(1, 2)), None),unresolvedalias('add('double(2), 1), None)]
+- OneRowRelation$
== Analyzed Logical Plan ==
double(add(1, 2)): int, add(double(2), 1): int
Project [double(add(1, 2))#14,add(double(2), 1)#15]
+- Project [double(add(1, 2))#14,add(double(2), 1)#15]
+- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
+- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
+- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
+- OneRowRelation$
== Optimized Logical Plan ==
Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
+- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
+- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
+- OneRowRelation$
== Physical Plan ==
WholeStageCodegen
: +- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
: +- INPUT
+- !BatchPythonEvaluation [add(pythonUDF1#17, 1)], [pythonUDF0#16,pythonUDF1#17,pythonUDF0#18]
+- !BatchPythonEvaluation [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
+- Scan OneRowRelation[]
```
## How was this patch tested?
Added new tests.
Using the following script to benchmark 1, 2 and 3 udfs,
```
df = sqlContext.range(1, 1 << 23, 1, 4)
double = F.udf(lambda x: x * 2, LongType())
print df.select(double(df.id)).count()
print df.select(double(df.id), double(df.id + 1)).count()
print df.select(double(df.id), double(df.id + 1), double(df.id + 2)).count()
```
Here is the results:
N | Before | After | speed up
---- |------------ | -------------|------
1 | 22 s | 7 s | 3.1X
2 | 38 s | 13 s | 2.9X
3 | 58 s | 16 s | 3.6X
This benchmark ran locally with 4 CPUs. For 3 UDFs, it launched 12 Python before before this patch, 4 process after this patch. After this patch, it will use less memory for multiple UDFs than before (less buffering).
Author: Davies Liu <davies@databricks.com>
Closes #12057 from davies/multi_udfs.
2016-03-31 19:40:20 -04:00
|
|
|
return lambda *a: g(f(*a))
|
|
|
|
|
|
|
|
|
|
|
|
def wrap_udf(f, return_type):
|
|
|
|
if return_type.needConversion():
|
|
|
|
toInternal = return_type.toInternal
|
|
|
|
return lambda *a: toInternal(f(*a))
|
|
|
|
else:
|
|
|
|
return lambda *a: f(*a)
|
|
|
|
|
|
|
|
|
|
|
|
def read_single_udf(pickleSer, infile):
|
|
|
|
num_arg = read_int(infile)
|
|
|
|
arg_offsets = [read_int(infile) for i in range(num_arg)]
|
|
|
|
row_func = None
|
|
|
|
for i in range(read_int(infile)):
|
|
|
|
f, return_type = read_command(pickleSer, infile)
|
|
|
|
if row_func is None:
|
|
|
|
row_func = f
|
|
|
|
else:
|
|
|
|
row_func = chain(row_func, f)
|
|
|
|
# the last returnType will be the return type of UDF
|
|
|
|
return arg_offsets, wrap_udf(row_func, return_type)
|
|
|
|
|
|
|
|
|
|
|
|
def read_udfs(pickleSer, infile):
|
|
|
|
num_udfs = read_int(infile)
|
2017-05-10 19:50:57 -04:00
|
|
|
udfs = {}
|
|
|
|
call_udf = []
|
|
|
|
for i in range(num_udfs):
|
|
|
|
arg_offsets, udf = read_single_udf(pickleSer, infile)
|
|
|
|
udfs['f%d' % i] = udf
|
|
|
|
args = ["a[%d]" % o for o in arg_offsets]
|
|
|
|
call_udf.append("f%d(%s)" % (i, ", ".join(args)))
|
|
|
|
# Create function like this:
|
|
|
|
# lambda a: (f0(a0), f1(a1, a2), f2(a3))
|
|
|
|
# In the special case of a single UDF this will return a single result rather
|
|
|
|
# than a tuple of results; this is the format that the JVM side expects.
|
|
|
|
mapper_str = "lambda a: (%s)" % (", ".join(call_udf))
|
|
|
|
mapper = eval(mapper_str, udfs)
|
[SPARK-14267] [SQL] [PYSPARK] execute multiple Python UDFs within single batch
## What changes were proposed in this pull request?
This PR support multiple Python UDFs within single batch, also improve the performance.
```python
>>> from pyspark.sql.types import IntegerType
>>> sqlContext.registerFunction("double", lambda x: x * 2, IntegerType())
>>> sqlContext.registerFunction("add", lambda x, y: x + y, IntegerType())
>>> sqlContext.sql("SELECT double(add(1, 2)), add(double(2), 1)").explain(True)
== Parsed Logical Plan ==
'Project [unresolvedalias('double('add(1, 2)), None),unresolvedalias('add('double(2), 1), None)]
+- OneRowRelation$
== Analyzed Logical Plan ==
double(add(1, 2)): int, add(double(2), 1): int
Project [double(add(1, 2))#14,add(double(2), 1)#15]
+- Project [double(add(1, 2))#14,add(double(2), 1)#15]
+- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
+- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
+- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
+- OneRowRelation$
== Optimized Logical Plan ==
Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
+- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
+- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
+- OneRowRelation$
== Physical Plan ==
WholeStageCodegen
: +- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
: +- INPUT
+- !BatchPythonEvaluation [add(pythonUDF1#17, 1)], [pythonUDF0#16,pythonUDF1#17,pythonUDF0#18]
+- !BatchPythonEvaluation [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
+- Scan OneRowRelation[]
```
## How was this patch tested?
Added new tests.
Using the following script to benchmark 1, 2 and 3 udfs,
```
df = sqlContext.range(1, 1 << 23, 1, 4)
double = F.udf(lambda x: x * 2, LongType())
print df.select(double(df.id)).count()
print df.select(double(df.id), double(df.id + 1)).count()
print df.select(double(df.id), double(df.id + 1), double(df.id + 2)).count()
```
Here is the results:
N | Before | After | speed up
---- |------------ | -------------|------
1 | 22 s | 7 s | 3.1X
2 | 38 s | 13 s | 2.9X
3 | 58 s | 16 s | 3.6X
This benchmark ran locally with 4 CPUs. For 3 UDFs, it launched 12 Python before before this patch, 4 process after this patch. After this patch, it will use less memory for multiple UDFs than before (less buffering).
Author: Davies Liu <davies@databricks.com>
Closes #12057 from davies/multi_udfs.
2016-03-31 19:40:20 -04:00
|
|
|
|
|
|
|
func = lambda _, it: map(mapper, it)
|
|
|
|
ser = BatchedSerializer(PickleSerializer(), 100)
|
|
|
|
# profiling is not supported for UDF
|
|
|
|
return func, None, ser, ser
|
[SPARK-14215] [SQL] [PYSPARK] Support chained Python UDFs
## What changes were proposed in this pull request?
This PR brings the support for chained Python UDFs, for example
```sql
select udf1(udf2(a))
select udf1(udf2(a) + 3)
select udf1(udf2(a) + udf3(b))
```
Also directly chained unary Python UDFs are put in single batch of Python UDFs, others may require multiple batches.
For example,
```python
>>> sqlContext.sql("select double(double(1))").explain()
== Physical Plan ==
WholeStageCodegen
: +- Project [pythonUDF#10 AS double(double(1))#9]
: +- INPUT
+- !BatchPythonEvaluation double(double(1)), [pythonUDF#10]
+- Scan OneRowRelation[]
>>> sqlContext.sql("select double(double(1) + double(2))").explain()
== Physical Plan ==
WholeStageCodegen
: +- Project [pythonUDF#19 AS double((double(1) + double(2)))#16]
: +- INPUT
+- !BatchPythonEvaluation double((pythonUDF#17 + pythonUDF#18)), [pythonUDF#17,pythonUDF#18,pythonUDF#19]
+- !BatchPythonEvaluation double(2), [pythonUDF#17,pythonUDF#18]
+- !BatchPythonEvaluation double(1), [pythonUDF#17]
+- Scan OneRowRelation[]
```
TODO: will support multiple unrelated Python UDFs in one batch (another PR).
## How was this patch tested?
Added new unit tests for chained UDFs.
Author: Davies Liu <davies@databricks.com>
Closes #12014 from davies/py_udfs.
2016-03-29 18:06:29 -04:00
|
|
|
|
|
|
|
|
2013-05-06 19:34:30 -04:00
|
|
|
def main(infile, outfile):
|
2014-02-26 17:50:37 -05:00
|
|
|
try:
|
|
|
|
boot_time = time.time()
|
|
|
|
split_index = read_int(infile)
|
|
|
|
if split_index == -1: # for unit tests
|
2014-10-23 20:20:00 -04:00
|
|
|
exit(-1)
|
2013-08-15 19:01:19 -04:00
|
|
|
|
2015-05-18 15:55:13 -04:00
|
|
|
version = utf8_deserializer.loads(infile)
|
|
|
|
if version != "%d.%d" % sys.version_info[:2]:
|
|
|
|
raise Exception(("Python in worker has different version %s than that in " +
|
2016-11-10 05:23:45 -05:00
|
|
|
"driver %s, PySpark cannot run with different minor versions." +
|
|
|
|
"Please check environment variables PYSPARK_PYTHON and " +
|
|
|
|
"PYSPARK_DRIVER_PYTHON are correctly set.") %
|
2015-05-18 15:55:13 -04:00
|
|
|
("%d.%d" % sys.version_info[:2], version))
|
|
|
|
|
2014-09-14 01:31:21 -04:00
|
|
|
# initialize global state
|
2016-12-20 18:51:21 -05:00
|
|
|
taskContext = TaskContext._getOrCreate()
|
|
|
|
taskContext._stageId = read_int(infile)
|
|
|
|
taskContext._partitionId = read_int(infile)
|
|
|
|
taskContext._attemptNumber = read_int(infile)
|
|
|
|
taskContext._taskAttemptId = read_long(infile)
|
2014-09-14 01:31:21 -04:00
|
|
|
shuffle.MemoryBytesSpilled = 0
|
|
|
|
shuffle.DiskBytesSpilled = 0
|
|
|
|
_accumulatorRegistry.clear()
|
|
|
|
|
2014-02-26 17:50:37 -05:00
|
|
|
# fetch name of workdir
|
|
|
|
spark_files_dir = utf8_deserializer.loads(infile)
|
|
|
|
SparkFiles._root_directory = spark_files_dir
|
|
|
|
SparkFiles._is_running_on_worker = True
|
2013-08-15 19:01:19 -04:00
|
|
|
|
2014-02-26 17:50:37 -05:00
|
|
|
# fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
|
2014-09-24 15:10:09 -04:00
|
|
|
add_path(spark_files_dir) # *.py files that were added will be copied here
|
2014-07-22 01:30:53 -04:00
|
|
|
num_python_includes = read_int(infile)
|
2014-02-26 17:50:37 -05:00
|
|
|
for _ in range(num_python_includes):
|
|
|
|
filename = utf8_deserializer.loads(infile)
|
2014-09-24 15:10:09 -04:00
|
|
|
add_path(os.path.join(spark_files_dir, filename))
|
2015-04-16 19:20:57 -04:00
|
|
|
if sys.version > '3':
|
|
|
|
import importlib
|
|
|
|
importlib.invalidate_caches()
|
2013-08-15 19:01:19 -04:00
|
|
|
|
2014-05-10 16:02:13 -04:00
|
|
|
# fetch names and values of broadcast variables
|
|
|
|
num_broadcast_variables = read_int(infile)
|
|
|
|
for _ in range(num_broadcast_variables):
|
|
|
|
bid = read_long(infile)
|
2014-09-13 19:22:04 -04:00
|
|
|
if bid >= 0:
|
2014-11-24 20:17:03 -05:00
|
|
|
path = utf8_deserializer.loads(infile)
|
|
|
|
_broadcastRegistry[bid] = Broadcast(path=path)
|
2014-09-13 19:22:04 -04:00
|
|
|
else:
|
|
|
|
bid = - bid - 1
|
2014-09-18 21:11:48 -04:00
|
|
|
_broadcastRegistry.pop(bid)
|
2014-05-10 16:02:13 -04:00
|
|
|
|
2014-09-13 19:22:04 -04:00
|
|
|
_accumulatorRegistry.clear()
|
[SPARK-14267] [SQL] [PYSPARK] execute multiple Python UDFs within single batch
## What changes were proposed in this pull request?
This PR support multiple Python UDFs within single batch, also improve the performance.
```python
>>> from pyspark.sql.types import IntegerType
>>> sqlContext.registerFunction("double", lambda x: x * 2, IntegerType())
>>> sqlContext.registerFunction("add", lambda x, y: x + y, IntegerType())
>>> sqlContext.sql("SELECT double(add(1, 2)), add(double(2), 1)").explain(True)
== Parsed Logical Plan ==
'Project [unresolvedalias('double('add(1, 2)), None),unresolvedalias('add('double(2), 1), None)]
+- OneRowRelation$
== Analyzed Logical Plan ==
double(add(1, 2)): int, add(double(2), 1): int
Project [double(add(1, 2))#14,add(double(2), 1)#15]
+- Project [double(add(1, 2))#14,add(double(2), 1)#15]
+- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
+- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
+- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
+- OneRowRelation$
== Optimized Logical Plan ==
Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
+- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
+- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
+- OneRowRelation$
== Physical Plan ==
WholeStageCodegen
: +- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
: +- INPUT
+- !BatchPythonEvaluation [add(pythonUDF1#17, 1)], [pythonUDF0#16,pythonUDF1#17,pythonUDF0#18]
+- !BatchPythonEvaluation [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
+- Scan OneRowRelation[]
```
## How was this patch tested?
Added new tests.
Using the following script to benchmark 1, 2 and 3 udfs,
```
df = sqlContext.range(1, 1 << 23, 1, 4)
double = F.udf(lambda x: x * 2, LongType())
print df.select(double(df.id)).count()
print df.select(double(df.id), double(df.id + 1)).count()
print df.select(double(df.id), double(df.id + 1), double(df.id + 2)).count()
```
Here is the results:
N | Before | After | speed up
---- |------------ | -------------|------
1 | 22 s | 7 s | 3.1X
2 | 38 s | 13 s | 2.9X
3 | 58 s | 16 s | 3.6X
This benchmark ran locally with 4 CPUs. For 3 UDFs, it launched 12 Python before before this patch, 4 process after this patch. After this patch, it will use less memory for multiple UDFs than before (less buffering).
Author: Davies Liu <davies@databricks.com>
Closes #12057 from davies/multi_udfs.
2016-03-31 19:40:20 -04:00
|
|
|
is_sql_udf = read_int(infile)
|
|
|
|
if is_sql_udf:
|
|
|
|
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile)
|
[SPARK-14215] [SQL] [PYSPARK] Support chained Python UDFs
## What changes were proposed in this pull request?
This PR brings the support for chained Python UDFs, for example
```sql
select udf1(udf2(a))
select udf1(udf2(a) + 3)
select udf1(udf2(a) + udf3(b))
```
Also directly chained unary Python UDFs are put in single batch of Python UDFs, others may require multiple batches.
For example,
```python
>>> sqlContext.sql("select double(double(1))").explain()
== Physical Plan ==
WholeStageCodegen
: +- Project [pythonUDF#10 AS double(double(1))#9]
: +- INPUT
+- !BatchPythonEvaluation double(double(1)), [pythonUDF#10]
+- Scan OneRowRelation[]
>>> sqlContext.sql("select double(double(1) + double(2))").explain()
== Physical Plan ==
WholeStageCodegen
: +- Project [pythonUDF#19 AS double((double(1) + double(2)))#16]
: +- INPUT
+- !BatchPythonEvaluation double((pythonUDF#17 + pythonUDF#18)), [pythonUDF#17,pythonUDF#18,pythonUDF#19]
+- !BatchPythonEvaluation double(2), [pythonUDF#17,pythonUDF#18]
+- !BatchPythonEvaluation double(1), [pythonUDF#17]
+- Scan OneRowRelation[]
```
TODO: will support multiple unrelated Python UDFs in one batch (another PR).
## How was this patch tested?
Added new unit tests for chained UDFs.
Author: Davies Liu <davies@databricks.com>
Closes #12014 from davies/py_udfs.
2016-03-29 18:06:29 -04:00
|
|
|
else:
|
|
|
|
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
|
|
|
|
|
2014-02-26 17:50:37 -05:00
|
|
|
init_time = time.time()
|
[SPARK-3478] [PySpark] Profile the Python tasks
This patch add profiling support for PySpark, it will show the profiling results
before the driver exits, here is one example:
```
============================================================
Profile of RDD<id=3>
============================================================
5146507 function calls (5146487 primitive calls) in 71.094 seconds
Ordered by: internal time, cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
5144576 68.331 0.000 68.331 0.000 statcounter.py:44(merge)
20 2.735 0.137 71.071 3.554 statcounter.py:33(__init__)
20 0.017 0.001 0.017 0.001 {cPickle.dumps}
1024 0.003 0.000 0.003 0.000 t.py:16(<lambda>)
20 0.001 0.000 0.001 0.000 {reduce}
21 0.001 0.000 0.001 0.000 {cPickle.loads}
20 0.001 0.000 0.001 0.000 copy_reg.py:95(_slotnames)
41 0.001 0.000 0.001 0.000 serializers.py:461(read_int)
40 0.001 0.000 0.002 0.000 serializers.py:179(_batched)
62 0.000 0.000 0.000 0.000 {method 'read' of 'file' objects}
20 0.000 0.000 71.072 3.554 rdd.py:863(<lambda>)
20 0.000 0.000 0.001 0.000 serializers.py:198(load_stream)
40/20 0.000 0.000 71.072 3.554 rdd.py:2093(pipeline_func)
41 0.000 0.000 0.002 0.000 serializers.py:130(load_stream)
40 0.000 0.000 71.072 1.777 rdd.py:304(func)
20 0.000 0.000 71.094 3.555 worker.py:82(process)
```
Also, use can show profile result manually by `sc.show_profiles()` or dump it into disk
by `sc.dump_profiles(path)`, such as
```python
>>> sc._conf.set("spark.python.profile", "true")
>>> rdd = sc.parallelize(range(100)).map(str)
>>> rdd.count()
100
>>> sc.show_profiles()
============================================================
Profile of RDD<id=1>
============================================================
284 function calls (276 primitive calls) in 0.001 seconds
Ordered by: internal time, cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
4 0.000 0.000 0.000 0.000 serializers.py:198(load_stream)
4 0.000 0.000 0.000 0.000 {reduce}
12/4 0.000 0.000 0.001 0.000 rdd.py:2092(pipeline_func)
4 0.000 0.000 0.000 0.000 {cPickle.loads}
4 0.000 0.000 0.000 0.000 {cPickle.dumps}
104 0.000 0.000 0.000 0.000 rdd.py:852(<genexpr>)
8 0.000 0.000 0.000 0.000 serializers.py:461(read_int)
12 0.000 0.000 0.000 0.000 rdd.py:303(func)
```
The profiling is disabled by default, can be enabled by "spark.python.profile=true".
Also, users can dump the results into disks automatically for future analysis, by "spark.python.profile.dump=path_to_dump"
This is bugfix of #2351 cc JoshRosen
Author: Davies Liu <davies.liu@gmail.com>
Closes #2556 from davies/profiler and squashes the following commits:
e68df5a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
858e74c [Davies Liu] compatitable with python 2.6
7ef2aa0 [Davies Liu] bugfix, add tests for show_profiles and dump_profiles()
2b0daf2 [Davies Liu] fix docs
7a56c24 [Davies Liu] bugfix
cba9463 [Davies Liu] move show_profiles and dump_profiles to SparkContext
fb9565b [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
116d52a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
09d02c3 [Davies Liu] Merge branch 'master' into profiler
c23865c [Davies Liu] Merge branch 'master' into profiler
15d6f18 [Davies Liu] add docs for two configs
dadee1a [Davies Liu] add docs string and clear profiles after show or dump
4f8309d [Davies Liu] address comment, add tests
0a5b6eb [Davies Liu] fix Python UDF
4b20494 [Davies Liu] add profile for python
2014-09-30 21:24:57 -04:00
|
|
|
|
|
|
|
def process():
|
|
|
|
iterator = deserializer.load_stream(infile)
|
|
|
|
serializer.dump_stream(func(split_index, iterator), outfile)
|
|
|
|
|
2015-01-28 16:48:06 -05:00
|
|
|
if profiler:
|
|
|
|
profiler.profile(process)
|
[SPARK-3478] [PySpark] Profile the Python tasks
This patch add profiling support for PySpark, it will show the profiling results
before the driver exits, here is one example:
```
============================================================
Profile of RDD<id=3>
============================================================
5146507 function calls (5146487 primitive calls) in 71.094 seconds
Ordered by: internal time, cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
5144576 68.331 0.000 68.331 0.000 statcounter.py:44(merge)
20 2.735 0.137 71.071 3.554 statcounter.py:33(__init__)
20 0.017 0.001 0.017 0.001 {cPickle.dumps}
1024 0.003 0.000 0.003 0.000 t.py:16(<lambda>)
20 0.001 0.000 0.001 0.000 {reduce}
21 0.001 0.000 0.001 0.000 {cPickle.loads}
20 0.001 0.000 0.001 0.000 copy_reg.py:95(_slotnames)
41 0.001 0.000 0.001 0.000 serializers.py:461(read_int)
40 0.001 0.000 0.002 0.000 serializers.py:179(_batched)
62 0.000 0.000 0.000 0.000 {method 'read' of 'file' objects}
20 0.000 0.000 71.072 3.554 rdd.py:863(<lambda>)
20 0.000 0.000 0.001 0.000 serializers.py:198(load_stream)
40/20 0.000 0.000 71.072 3.554 rdd.py:2093(pipeline_func)
41 0.000 0.000 0.002 0.000 serializers.py:130(load_stream)
40 0.000 0.000 71.072 1.777 rdd.py:304(func)
20 0.000 0.000 71.094 3.555 worker.py:82(process)
```
Also, use can show profile result manually by `sc.show_profiles()` or dump it into disk
by `sc.dump_profiles(path)`, such as
```python
>>> sc._conf.set("spark.python.profile", "true")
>>> rdd = sc.parallelize(range(100)).map(str)
>>> rdd.count()
100
>>> sc.show_profiles()
============================================================
Profile of RDD<id=1>
============================================================
284 function calls (276 primitive calls) in 0.001 seconds
Ordered by: internal time, cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
4 0.000 0.000 0.000 0.000 serializers.py:198(load_stream)
4 0.000 0.000 0.000 0.000 {reduce}
12/4 0.000 0.000 0.001 0.000 rdd.py:2092(pipeline_func)
4 0.000 0.000 0.000 0.000 {cPickle.loads}
4 0.000 0.000 0.000 0.000 {cPickle.dumps}
104 0.000 0.000 0.000 0.000 rdd.py:852(<genexpr>)
8 0.000 0.000 0.000 0.000 serializers.py:461(read_int)
12 0.000 0.000 0.000 0.000 rdd.py:303(func)
```
The profiling is disabled by default, can be enabled by "spark.python.profile=true".
Also, users can dump the results into disks automatically for future analysis, by "spark.python.profile.dump=path_to_dump"
This is bugfix of #2351 cc JoshRosen
Author: Davies Liu <davies.liu@gmail.com>
Closes #2556 from davies/profiler and squashes the following commits:
e68df5a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
858e74c [Davies Liu] compatitable with python 2.6
7ef2aa0 [Davies Liu] bugfix, add tests for show_profiles and dump_profiles()
2b0daf2 [Davies Liu] fix docs
7a56c24 [Davies Liu] bugfix
cba9463 [Davies Liu] move show_profiles and dump_profiles to SparkContext
fb9565b [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
116d52a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
09d02c3 [Davies Liu] Merge branch 'master' into profiler
c23865c [Davies Liu] Merge branch 'master' into profiler
15d6f18 [Davies Liu] add docs for two configs
dadee1a [Davies Liu] add docs string and clear profiles after show or dump
4f8309d [Davies Liu] address comment, add tests
0a5b6eb [Davies Liu] fix Python UDF
4b20494 [Davies Liu] add profile for python
2014-09-30 21:24:57 -04:00
|
|
|
else:
|
|
|
|
process()
|
2014-07-29 03:15:45 -04:00
|
|
|
except Exception:
|
|
|
|
try:
|
|
|
|
write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
|
2015-04-16 19:20:57 -04:00
|
|
|
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
|
2014-07-29 03:15:45 -04:00
|
|
|
except IOError:
|
|
|
|
# JVM close the socket
|
|
|
|
pass
|
|
|
|
except Exception:
|
|
|
|
# Write the error to stderr if it happened while serializing
|
2015-04-16 19:20:57 -04:00
|
|
|
print("PySpark worker failed with exception:", file=sys.stderr)
|
|
|
|
print(traceback.format_exc(), file=sys.stderr)
|
2014-07-29 03:15:45 -04:00
|
|
|
exit(-1)
|
2013-03-10 16:54:46 -04:00
|
|
|
finish_time = time.time()
|
2013-05-06 19:34:30 -04:00
|
|
|
report_times(outfile, boot_time, init_time, finish_time)
|
2014-09-14 01:31:21 -04:00
|
|
|
write_long(shuffle.MemoryBytesSpilled, outfile)
|
|
|
|
write_long(shuffle.DiskBytesSpilled, outfile)
|
|
|
|
|
2013-01-20 04:57:44 -05:00
|
|
|
# Mark the beginning of the accumulators section of the output
|
2013-11-03 00:13:18 -04:00
|
|
|
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
|
|
|
|
write_int(len(_accumulatorRegistry), outfile)
|
|
|
|
for (aid, accum) in _accumulatorRegistry.items():
|
2013-11-05 20:52:39 -05:00
|
|
|
pickleSer._write_with_length((aid, accum._value), outfile)
|
2012-08-10 04:10:02 -04:00
|
|
|
|
2014-10-23 20:20:00 -04:00
|
|
|
# check end of stream
|
|
|
|
if read_int(infile) == SpecialLengths.END_OF_STREAM:
|
|
|
|
write_int(SpecialLengths.END_OF_STREAM, outfile)
|
|
|
|
else:
|
|
|
|
# write a different value to tell JVM to not reuse this worker
|
|
|
|
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
|
|
|
|
exit(-1)
|
|
|
|
|
2012-08-10 04:10:02 -04:00
|
|
|
|
|
|
|
if __name__ == '__main__':
|
2013-09-01 21:06:15 -04:00
|
|
|
# Read a local port to connect to from stdin
|
|
|
|
java_port = int(sys.stdin.readline())
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
sock.connect(("127.0.0.1", java_port))
|
2015-08-13 20:33:37 -04:00
|
|
|
sock_file = sock.makefile("rwb", 65536)
|
2013-09-01 21:06:15 -04:00
|
|
|
main(sock_file, sock_file)
|