Re-implement the Python broadcast using file:
1) serialize the python object using cPickle, write into disks.
2) Create a wrapper in JVM (for the dumped file), it read data from during serialization
3) Using TorrentBroadcast or HttpBroadcast to transfer the data (compressed) into executors
4) During deserialization, writing the data into disk.
5) Passing the path into Python worker, read data from disk and unpickle it into python object, until the first access.
It fixes the performance regression introduced in #2659, has similar performance as 1.1, but support object larger than 2G, also improve the memory efficiency (only one compressed copy in driver and executor).
Testing with a 500M broadcast and 4 tasks (excluding the benefit from reused worker in 1.2):
name | 1.1 | 1.2 with this patch | improvement
---------|--------|---------|--------
python-broadcast-w-bytes | 25.20 | 9.33 | 170.13% |
python-broadcast-w-set | 4.13 | 4.50 | -8.35% |
Testing with 100 tasks (16 CPUs):
name | 1.1 | 1.2 with this patch | improvement
---------|--------|---------|--------
python-broadcast-w-bytes | 38.16 | 8.40 | 353.98%
python-broadcast-w-set | 23.29 | 9.59 | 142.80%
Author: Davies Liu <davies@databricks.com>
Closes#3417 from davies/pybroadcast and squashes the following commits:
50a58e0 [Davies Liu] address comments
b98de1d [Davies Liu] disable gc while unpickle
e5ee6b9 [Davies Liu] support large string
09303b8 [Davies Liu] read all data into memory
dde02dd [Davies Liu] improve performance of python broadcast
This patch will bring support for broadcasting objects larger than 2G.
pickle, zlib, FrameSerializer and Array[Byte] all can not support objects larger than 2G, so this patch introduce LargeObjectSerializer to serialize broadcast objects, the object will be serialized and compressed into small chunks, it also change the type of Broadcast[Array[Byte]]] into Broadcast[Array[Array[Byte]]]].
Testing for support broadcast objects larger than 2G is slow and memory hungry, so this is tested manually, could be added into SparkPerf.
Author: Davies Liu <davies@databricks.com>
Author: Davies Liu <davies.liu@gmail.com>
Closes#2659 from davies/huge and squashes the following commits:
7b57a14 [Davies Liu] add more tests for broadcast
28acff9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
a2f6a02 [Davies Liu] bug fix
4820613 [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
5875c73 [Davies Liu] address comments
10a349b [Davies Liu] address comments
0c33016 [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
6182c8f [Davies Liu] Merge branch 'master' into huge
d94b68f [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
2514848 [Davies Liu] address comments
fda395b [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
1c2d928 [Davies Liu] fix scala style
091b107 [Davies Liu] broadcast objects larger than 2G
After take(), maybe there are some garbage left in the socket, then next task assigned to this worker will hang because of corrupted data.
We should make sure the socket is clean before reuse it, write END_OF_STREAM at the end, and check it after read out all result from python.
Author: Davies Liu <davies.liu@gmail.com>
Author: Davies Liu <davies@databricks.com>
Closes#2838 from davies/fix_reuse and squashes the following commits:
8872914 [Davies Liu] fix tests
660875b [Davies Liu] fix bug while reuse worker after take()
Python modules added through addPyFile should take precedence over system modules.
This patch put the path for user added module in the front of sys.path (just after '').
Author: Davies Liu <davies.liu@gmail.com>
Closes#2492 from davies/path and squashes the following commits:
4a2af78 [Davies Liu] fix tests
f7ff4da [Davies Liu] ad license header
6b0002f [Davies Liu] add tests
c16c392 [Davies Liu] put addPyFile in front of sys.path
Py4j can not handle large string efficiently, so we should use broadcast for large closure automatically. (Broadcast use local filesystem to pass through data).
Author: Davies Liu <davies.liu@gmail.com>
Closes#2417 from davies/command and squashes the following commits:
fbf4e97 [Davies Liu] bugfix
aefd508 [Davies Liu] use broadcast automatically for large closure
Aggregate the number of bytes spilled into disks during aggregation or sorting, show them in Web UI.
![spilled](https://cloud.githubusercontent.com/assets/40902/4209758/4b995562-386d-11e4-97c1-8e838ee1d4e3.png)
This patch is blocked by SPARK-3465. (It includes a fix for that).
Author: Davies Liu <davies.liu@gmail.com>
Closes#2336 from davies/metrics and squashes the following commits:
e37df38 [Davies Liu] remove outdated comments
1245eb7 [Davies Liu] remove the temporary fix
ebd2f43 [Davies Liu] Merge branch 'master' into metrics
7e4ad04 [Davies Liu] Merge branch 'master' into metrics
fbe9029 [Davies Liu] show spilled bytes in Python in web ui
Reuse Python worker to avoid the overhead of fork() Python process for each tasks. It also tracks the broadcasts for each worker, avoid sending repeated broadcasts.
This can reduce the time for dummy task from 22ms to 13ms (-40%). It can help to reduce the latency for Spark Streaming.
For a job with broadcast (43M after compress):
```
b = sc.broadcast(set(range(30000000)))
print sc.parallelize(range(24000), 100).filter(lambda x: x in b.value).count()
```
It will finish in 281s without reused worker, and it will finish in 65s with reused worker(4 CPUs). After reusing the worker, it can save about 9 seconds for transfer and deserialize the broadcast for each tasks.
It's enabled by default, could be disabled by `spark.python.worker.reuse = false`.
Author: Davies Liu <davies.liu@gmail.com>
Closes#2259 from davies/reuse-worker and squashes the following commits:
f11f617 [Davies Liu] Merge branch 'master' into reuse-worker
3939f20 [Davies Liu] fix bug in serializer in mllib
cf1c55e [Davies Liu] address comments
3133a60 [Davies Liu] fix accumulator with reused worker
760ab1f [Davies Liu] do not reuse worker if there are any exceptions
7abb224 [Davies Liu] refactor: sychronized with itself
ac3206e [Davies Liu] renaming
8911f44 [Davies Liu] synchronized getWorkerBroadcasts()
6325fc1 [Davies Liu] bugfix: bid >= 0
e0131a2 [Davies Liu] fix name of config
583716e [Davies Liu] only reuse completed and not interrupted worker
ace2917 [Davies Liu] kill python worker after timeout
6123d0f [Davies Liu] track broadcasts for each worker
8d2f08c [Davies Liu] reuse python worker
This fixes SPARK-3114, an issue where we inadvertently broke Python UDFs in Spark SQL.
This PR modifiers the test runner script to always run the PySpark SQL tests, irrespective of whether SparkSQL itself has been modified. It also includes Davies' fix for the bug.
Closes#2026.
Author: Josh Rosen <joshrosen@apache.org>
Author: Davies Liu <davies.liu@gmail.com>
Closes#2027 from JoshRosen/pyspark-sql-fix and squashes the following commits:
9af2708 [Davies Liu] bugfix: disable compression of command
0d8d3a4 [Josh Rosen] Always run Python Spark SQL tests.
Passing large object by py4j is very slow (cost much memory), so pass broadcast objects via files (similar to parallelize()).
Add an option to keep object in driver (it's False by default) to save memory in driver.
Author: Davies Liu <davies.liu@gmail.com>
Closes#1912 from davies/broadcast and squashes the following commits:
e06df4a [Davies Liu] load broadcast from disk in driver automatically
db3f232 [Davies Liu] fix serialization of accumulator
631a827 [Davies Liu] Merge branch 'master' into broadcast
c7baa8c [Davies Liu] compress serrialized broadcast and command
9a7161f [Davies Liu] fix doc tests
e93cf4b [Davies Liu] address comments: add test
6226189 [Davies Liu] improve large broadcast
During rdd.take(n), JVM will close the socket if it had got enough data, the Python worker should keep silent in this case.
In the same time, the worker should not print the trackback into stderr if it send the traceback to JVM successfully.
Author: Davies Liu <davies.liu@gmail.com>
Closes#1625 from davies/error and squashes the following commits:
4fbcc6d [Davies Liu] disable log4j during testing when exception is expected.
cc14202 [Davies Liu] keep silent in worker if JVM close the socket
This fixes https://issues.apache.org/jira/browse/SPARK-1731 by adding the Python includes to the PYTHONPATH before depickling the broadcast values
@airhorns
Author: Bouke van der Bijl <boukevanderbijl@gmail.com>
Closes#656 from bouk/python-includes-before-broadcast and squashes the following commits:
7b0dfe4 [Bouke van der Bijl] Add Python includes to path before depickling broadcast values
This surroungs the complete worker code in a try/except block so we catch any error that arrives. An example would be the depickling failing for some reason
@JoshRosen
Author: Bouke van der Bijl <boukevanderbijl@gmail.com>
Closes#644 from bouk/catch-depickling-errors and squashes the following commits:
f0f67cc [Bouke van der Bijl] Lol indentation
0e4d504 [Bouke van der Bijl] Surround the complete python worker with the try block
Fixed minor typo in worker.py
Author: jyotiska <jyotiska123@gmail.com>
Closes#630 from jyotiska/pyspark_code and squashes the following commits:
ee44201 [jyotiska] typo fixed in worker.py
This fixes SPARK-1043, a bug introduced in 0.9.0
where PySpark couldn't serialize strings > 64kB.
This fix was written by @tyro89 and @bouk in #512.
This commit squashes and rebases their pull request
in order to fix some merge conflicts.
This helps in case the exception happened while serializing a record to
be sent to Java, leaving the stream to Java in an inconsistent state
where PythonRDD won't be able to read the error.
For now, this only adds MarshalSerializer, but it lays the groundwork
for other supporting custom serializers. Many of these mechanisms
can also be used to support deserialization of different data formats
sent by Java, such as data encoded by MsgPack.
This also fixes a bug in SparkContext.union().
If we support custom serializers, the Python
worker will know what type of input to expect,
so we won't need to wrap Tuple2 and Strings into
pickled tuples and strings.