spark-instrumented-optimizer/python/pyspark/daemon.py

186 lines
6 KiB
Python
Raw Normal View History

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import numbers
2013-05-06 19:34:30 -04:00
import os
import signal
import select
import socket
2013-05-06 19:34:30 -04:00
import sys
import traceback
import time
[SPARK-3030] [PySpark] Reuse Python worker Reuse Python worker to avoid the overhead of fork() Python process for each tasks. It also tracks the broadcasts for each worker, avoid sending repeated broadcasts. This can reduce the time for dummy task from 22ms to 13ms (-40%). It can help to reduce the latency for Spark Streaming. For a job with broadcast (43M after compress): ``` b = sc.broadcast(set(range(30000000))) print sc.parallelize(range(24000), 100).filter(lambda x: x in b.value).count() ``` It will finish in 281s without reused worker, and it will finish in 65s with reused worker(4 CPUs). After reusing the worker, it can save about 9 seconds for transfer and deserialize the broadcast for each tasks. It's enabled by default, could be disabled by `spark.python.worker.reuse = false`. Author: Davies Liu <davies.liu@gmail.com> Closes #2259 from davies/reuse-worker and squashes the following commits: f11f617 [Davies Liu] Merge branch 'master' into reuse-worker 3939f20 [Davies Liu] fix bug in serializer in mllib cf1c55e [Davies Liu] address comments 3133a60 [Davies Liu] fix accumulator with reused worker 760ab1f [Davies Liu] do not reuse worker if there are any exceptions 7abb224 [Davies Liu] refactor: sychronized with itself ac3206e [Davies Liu] renaming 8911f44 [Davies Liu] synchronized getWorkerBroadcasts() 6325fc1 [Davies Liu] bugfix: bid >= 0 e0131a2 [Davies Liu] fix name of config 583716e [Davies Liu] only reuse completed and not interrupted worker ace2917 [Davies Liu] kill python worker after timeout 6123d0f [Davies Liu] track broadcasts for each worker 8d2f08c [Davies Liu] reuse python worker
2014-09-13 19:22:04 -04:00
import gc
from errno import EINTR, ECHILD, EAGAIN
from socket import AF_INET, SOCK_STREAM, SOMAXCONN
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT
2013-05-06 19:34:30 -04:00
from pyspark.worker import main as worker_main
from pyspark.serializers import read_int, write_int
2013-05-06 19:34:30 -04:00
2013-06-21 12:13:48 -04:00
def compute_real_exit_code(exit_code):
[SPARK-2470] PEP8 fixes to PySpark This pull request aims to resolve all outstanding PEP8 violations in PySpark. Author: Nicholas Chammas <nicholas.chammas@gmail.com> Author: nchammas <nicholas.chammas@gmail.com> Closes #1505 from nchammas/master and squashes the following commits: 98171af [Nicholas Chammas] [SPARK-2470] revert PEP 8 fixes to cloudpickle cba7768 [Nicholas Chammas] [SPARK-2470] wrap expression list in parentheses e178dbe [Nicholas Chammas] [SPARK-2470] style - change position of line break 9127d2b [Nicholas Chammas] [SPARK-2470] wrap expression lists in parentheses 22132a4 [Nicholas Chammas] [SPARK-2470] wrap conditionals in parentheses 24639bc [Nicholas Chammas] [SPARK-2470] fix whitespace for doctest 7d557b7 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to tests.py 8f8e4c0 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to storagelevel.py b3b96cf [Nicholas Chammas] [SPARK-2470] PEP8 fixes to statcounter.py d644477 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to worker.py aa3a7b6 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to sql.py 1916859 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to shell.py 95d1d95 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to serializers.py a0fec2e [Nicholas Chammas] [SPARK-2470] PEP8 fixes to mllib c85e1e5 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to join.py d14f2f1 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to __init__.py 81fcb20 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to resultiterable.py 1bde265 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to java_gateway.py 7fc849c [Nicholas Chammas] [SPARK-2470] PEP8 fixes to daemon.py ca2d28b [Nicholas Chammas] [SPARK-2470] PEP8 fixes to context.py f4e0039 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to conf.py a6d5e4b [Nicholas Chammas] [SPARK-2470] PEP8 fixes to cloudpickle.py f0a7ebf [Nicholas Chammas] [SPARK-2470] PEP8 fixes to rddsampler.py 4dd148f [nchammas] Merge pull request #5 from apache/master f7e4581 [Nicholas Chammas] unrelated pep8 fix a36eed0 [Nicholas Chammas] name ec2 instances and security groups consistently de7292a [nchammas] Merge pull request #4 from apache/master 2e4fe00 [nchammas] Merge pull request #3 from apache/master 89fde08 [nchammas] Merge pull request #2 from apache/master 69f6e22 [Nicholas Chammas] PEP8 fixes 2627247 [Nicholas Chammas] broke up lines before they hit 100 chars 6544b7e [Nicholas Chammas] [SPARK-2065] give launched instances names 69da6cf [nchammas] Merge pull request #1 from apache/master
2014-07-22 01:30:53 -04:00
# SystemExit's code can be integer or string, but os._exit only accepts integers
if isinstance(exit_code, numbers.Integral):
return exit_code
else:
return 1
2013-06-21 12:13:48 -04:00
def worker(sock):
"""
Called by a worker process after the fork().
"""
signal.signal(SIGHUP, SIG_DFL)
signal.signal(SIGCHLD, SIG_DFL)
signal.signal(SIGTERM, SIG_DFL)
# restore the handler for SIGINT,
# it's useful for debugging (show the stacktrace before exit)
signal.signal(SIGINT, signal.default_int_handler)
# Read the socket using fdopen instead of socket.makefile() because the latter
# seems to be very slow; note that we need to dup() the file descriptor because
# otherwise writes also cause a seek that makes us miss data on the read side.
infile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
exit_code = 0
try:
worker_main(infile, outfile)
except SystemExit as exc:
[SPARK-3030] [PySpark] Reuse Python worker Reuse Python worker to avoid the overhead of fork() Python process for each tasks. It also tracks the broadcasts for each worker, avoid sending repeated broadcasts. This can reduce the time for dummy task from 22ms to 13ms (-40%). It can help to reduce the latency for Spark Streaming. For a job with broadcast (43M after compress): ``` b = sc.broadcast(set(range(30000000))) print sc.parallelize(range(24000), 100).filter(lambda x: x in b.value).count() ``` It will finish in 281s without reused worker, and it will finish in 65s with reused worker(4 CPUs). After reusing the worker, it can save about 9 seconds for transfer and deserialize the broadcast for each tasks. It's enabled by default, could be disabled by `spark.python.worker.reuse = false`. Author: Davies Liu <davies.liu@gmail.com> Closes #2259 from davies/reuse-worker and squashes the following commits: f11f617 [Davies Liu] Merge branch 'master' into reuse-worker 3939f20 [Davies Liu] fix bug in serializer in mllib cf1c55e [Davies Liu] address comments 3133a60 [Davies Liu] fix accumulator with reused worker 760ab1f [Davies Liu] do not reuse worker if there are any exceptions 7abb224 [Davies Liu] refactor: sychronized with itself ac3206e [Davies Liu] renaming 8911f44 [Davies Liu] synchronized getWorkerBroadcasts() 6325fc1 [Davies Liu] bugfix: bid >= 0 e0131a2 [Davies Liu] fix name of config 583716e [Davies Liu] only reuse completed and not interrupted worker ace2917 [Davies Liu] kill python worker after timeout 6123d0f [Davies Liu] track broadcasts for each worker 8d2f08c [Davies Liu] reuse python worker
2014-09-13 19:22:04 -04:00
exit_code = compute_real_exit_code(exc.code)
finally:
outfile.flush()
return exit_code
2013-05-06 19:34:30 -04:00
# Cleanup zombie children
def cleanup_dead_children():
try:
while True:
pid, _ = os.waitpid(0, os.WNOHANG)
if not pid:
break
except:
pass
2013-05-06 19:34:30 -04:00
def manager():
# Create a new process group to corral our children
os.setpgid(0, 0)
# Create a listening socket on the AF_INET loopback interface
listen_sock = socket.socket(AF_INET, SOCK_STREAM)
2013-05-06 19:34:30 -04:00
listen_sock.bind(('127.0.0.1', 0))
listen_sock.listen(max(1024, SOMAXCONN))
2013-05-06 19:34:30 -04:00
listen_host, listen_port = listen_sock.getsockname()
write_int(listen_port, sys.stdout)
[SPARK-3094] [PySpark] compatitable with PyPy After this patch, we can run PySpark in PyPy (testing with PyPy 2.3.1 in Mac 10.9), for example: ``` PYSPARK_PYTHON=pypy ./bin/spark-submit wordcount.py ``` The performance speed up will depend on work load (from 20% to 3000%). Here are some benchmarks: Job | CPython 2.7 | PyPy 2.3.1 | Speed up ------- | ------------ | ------------- | ------- Word Count | 41s | 15s | 2.7x Sort | 46s | 44s | 1.05x Stats | 174s | 3.6s | 48x Here is the code used for benchmark: ```python rdd = sc.textFile("text") def wordcount(): rdd.flatMap(lambda x:x.split('/'))\ .map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).collectAsMap() def sort(): rdd.sortBy(lambda x:x, 1).count() def stats(): sc.parallelize(range(1024), 20).flatMap(lambda x: xrange(5024)).stats() ``` Author: Davies Liu <davies.liu@gmail.com> Closes #2144 from davies/pypy and squashes the following commits: 9aed6c5 [Davies Liu] use protocol 2 in CloudPickle 4bc1f04 [Davies Liu] refactor b20ab3a [Davies Liu] pickle sys.stdout and stderr in portable way 3ca2351 [Davies Liu] Merge branch 'master' into pypy fae8b19 [Davies Liu] improve attrgetter, add tests 591f830 [Davies Liu] try to run tests with PyPy in run-tests c8d62ba [Davies Liu] cleanup f651fd0 [Davies Liu] fix tests using array with PyPy 1b98fb3 [Davies Liu] serialize itemgetter/attrgetter in portable ways 3c1dbfe [Davies Liu] Merge branch 'master' into pypy 42fb5fa [Davies Liu] Merge branch 'master' into pypy cb2d724 [Davies Liu] fix tests 9986692 [Davies Liu] Merge branch 'master' into pypy 25b4ca7 [Davies Liu] support PyPy
2014-09-12 21:42:50 -04:00
sys.stdout.flush()
2013-05-06 19:34:30 -04:00
def shutdown(code):
signal.signal(SIGTERM, SIG_DFL)
# Send SIGHUP to notify workers of shutdown
os.kill(0, SIGHUP)
exit(code)
2013-05-06 19:34:30 -04:00
def handle_sigterm(*args):
shutdown(1)
signal.signal(SIGTERM, handle_sigterm) # Gracefully exit on SIGTERM
signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP
2013-05-06 19:34:30 -04:00
[SPARK-3030] [PySpark] Reuse Python worker Reuse Python worker to avoid the overhead of fork() Python process for each tasks. It also tracks the broadcasts for each worker, avoid sending repeated broadcasts. This can reduce the time for dummy task from 22ms to 13ms (-40%). It can help to reduce the latency for Spark Streaming. For a job with broadcast (43M after compress): ``` b = sc.broadcast(set(range(30000000))) print sc.parallelize(range(24000), 100).filter(lambda x: x in b.value).count() ``` It will finish in 281s without reused worker, and it will finish in 65s with reused worker(4 CPUs). After reusing the worker, it can save about 9 seconds for transfer and deserialize the broadcast for each tasks. It's enabled by default, could be disabled by `spark.python.worker.reuse = false`. Author: Davies Liu <davies.liu@gmail.com> Closes #2259 from davies/reuse-worker and squashes the following commits: f11f617 [Davies Liu] Merge branch 'master' into reuse-worker 3939f20 [Davies Liu] fix bug in serializer in mllib cf1c55e [Davies Liu] address comments 3133a60 [Davies Liu] fix accumulator with reused worker 760ab1f [Davies Liu] do not reuse worker if there are any exceptions 7abb224 [Davies Liu] refactor: sychronized with itself ac3206e [Davies Liu] renaming 8911f44 [Davies Liu] synchronized getWorkerBroadcasts() 6325fc1 [Davies Liu] bugfix: bid >= 0 e0131a2 [Davies Liu] fix name of config 583716e [Davies Liu] only reuse completed and not interrupted worker ace2917 [Davies Liu] kill python worker after timeout 6123d0f [Davies Liu] track broadcasts for each worker 8d2f08c [Davies Liu] reuse python worker
2014-09-13 19:22:04 -04:00
reuse = os.environ.get("SPARK_REUSE_WORKER")
2013-05-06 19:34:30 -04:00
# Initialization complete
try:
while True:
try:
ready_fds = select.select([0, listen_sock], [], [], 1)[0]
except select.error as ex:
if ex[0] == EINTR:
continue
else:
raise
# cleanup in signal handler will cause deadlock
cleanup_dead_children()
if 0 in ready_fds:
try:
worker_pid = read_int(sys.stdin)
except EOFError:
# Spark told us to exit by closing stdin
shutdown(0)
try:
os.kill(worker_pid, signal.SIGKILL)
except OSError:
[SPARK-2627] [PySpark] have the build enforce PEP 8 automatically As described in [SPARK-2627](https://issues.apache.org/jira/browse/SPARK-2627), we'd like Python code to automatically be checked for PEP 8 compliance by Jenkins. This pull request aims to do that. Notes: * We may need to install [`pep8`](https://pypi.python.org/pypi/pep8) on the build server. * I'm expecting tests to fail now that PEP 8 compliance is being checked as part of the build. I'm fine with cleaning up any remaining PEP 8 violations as part of this pull request. * I did not understand why the RAT and scalastyle reports are saved to text files. I did the same for the PEP 8 check, but only so that the console output style can match those for the RAT and scalastyle checks. The PEP 8 report is removed right after the check is complete. * Updates to the ["Contributing to Spark"](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark) guide will be submitted elsewhere, as I don't believe that text is part of the Spark repo. Author: Nicholas Chammas <nicholas.chammas@gmail.com> Author: nchammas <nicholas.chammas@gmail.com> Closes #1744 from nchammas/master and squashes the following commits: 274b238 [Nicholas Chammas] [SPARK-2627] [PySpark] minor indentation changes 983d963 [nchammas] Merge pull request #5 from apache/master 1db5314 [nchammas] Merge pull request #4 from apache/master 0e0245f [Nicholas Chammas] [SPARK-2627] undo erroneous whitespace fixes bf30942 [Nicholas Chammas] [SPARK-2627] PEP8: comment spacing 6db9a44 [nchammas] Merge pull request #3 from apache/master 7b4750e [Nicholas Chammas] merge upstream changes 91b7584 [Nicholas Chammas] [SPARK-2627] undo unnecessary line breaks 44e3e56 [Nicholas Chammas] [SPARK-2627] use tox.ini to exclude files b09fae2 [Nicholas Chammas] don't wrap comments unnecessarily bfb9f9f [Nicholas Chammas] [SPARK-2627] keep up with the PEP 8 fixes 9da347f [nchammas] Merge pull request #2 from apache/master aa5b4b5 [Nicholas Chammas] [SPARK-2627] follow Spark bash style for if blocks d0a83b9 [Nicholas Chammas] [SPARK-2627] check that pep8 downloaded fine dffb5dd [Nicholas Chammas] [SPARK-2627] download pep8 at runtime a1ce7ae [Nicholas Chammas] [SPARK-2627] space out test report sections 21da538 [Nicholas Chammas] [SPARK-2627] it's PEP 8, not PEP8 6f4900b [Nicholas Chammas] [SPARK-2627] more misc PEP 8 fixes fe57ed0 [Nicholas Chammas] removing merge conflict backups 9c01d4c [nchammas] Merge pull request #1 from apache/master 9a66cb0 [Nicholas Chammas] resolving merge conflicts a31ccc4 [Nicholas Chammas] [SPARK-2627] miscellaneous PEP 8 fixes beaa9ac [Nicholas Chammas] [SPARK-2627] fail check on non-zero status 723ed39 [Nicholas Chammas] always delete the report file 0541ebb [Nicholas Chammas] [SPARK-2627] call Python linter from run-tests 12440fa [Nicholas Chammas] [SPARK-2627] add Scala linter 61c07b9 [Nicholas Chammas] [SPARK-2627] add Python linter 75ad552 [Nicholas Chammas] make check output style consistent
2014-08-06 15:58:24 -04:00
pass # process already died
if listen_sock in ready_fds:
try:
sock, _ = listen_sock.accept()
except OSError as e:
if e.errno == EINTR:
continue
raise
# Launch a worker process
try:
pid = os.fork()
except OSError as e:
if e.errno in (EAGAIN, EINTR):
time.sleep(1)
pid = os.fork() # error here will shutdown daemon
else:
outfile = sock.makefile('w')
write_int(e.errno, outfile) # Signal that the fork failed
outfile.flush()
outfile.close()
sock.close()
continue
if pid == 0:
# in child process
listen_sock.close()
try:
[SPARK-3030] [PySpark] Reuse Python worker Reuse Python worker to avoid the overhead of fork() Python process for each tasks. It also tracks the broadcasts for each worker, avoid sending repeated broadcasts. This can reduce the time for dummy task from 22ms to 13ms (-40%). It can help to reduce the latency for Spark Streaming. For a job with broadcast (43M after compress): ``` b = sc.broadcast(set(range(30000000))) print sc.parallelize(range(24000), 100).filter(lambda x: x in b.value).count() ``` It will finish in 281s without reused worker, and it will finish in 65s with reused worker(4 CPUs). After reusing the worker, it can save about 9 seconds for transfer and deserialize the broadcast for each tasks. It's enabled by default, could be disabled by `spark.python.worker.reuse = false`. Author: Davies Liu <davies.liu@gmail.com> Closes #2259 from davies/reuse-worker and squashes the following commits: f11f617 [Davies Liu] Merge branch 'master' into reuse-worker 3939f20 [Davies Liu] fix bug in serializer in mllib cf1c55e [Davies Liu] address comments 3133a60 [Davies Liu] fix accumulator with reused worker 760ab1f [Davies Liu] do not reuse worker if there are any exceptions 7abb224 [Davies Liu] refactor: sychronized with itself ac3206e [Davies Liu] renaming 8911f44 [Davies Liu] synchronized getWorkerBroadcasts() 6325fc1 [Davies Liu] bugfix: bid >= 0 e0131a2 [Davies Liu] fix name of config 583716e [Davies Liu] only reuse completed and not interrupted worker ace2917 [Davies Liu] kill python worker after timeout 6123d0f [Davies Liu] track broadcasts for each worker 8d2f08c [Davies Liu] reuse python worker
2014-09-13 19:22:04 -04:00
# Acknowledge that the fork was successful
outfile = sock.makefile("w")
write_int(os.getpid(), outfile)
outfile.flush()
outfile.close()
while True:
code = worker(sock)
if not reuse or code:
[SPARK-3030] [PySpark] Reuse Python worker Reuse Python worker to avoid the overhead of fork() Python process for each tasks. It also tracks the broadcasts for each worker, avoid sending repeated broadcasts. This can reduce the time for dummy task from 22ms to 13ms (-40%). It can help to reduce the latency for Spark Streaming. For a job with broadcast (43M after compress): ``` b = sc.broadcast(set(range(30000000))) print sc.parallelize(range(24000), 100).filter(lambda x: x in b.value).count() ``` It will finish in 281s without reused worker, and it will finish in 65s with reused worker(4 CPUs). After reusing the worker, it can save about 9 seconds for transfer and deserialize the broadcast for each tasks. It's enabled by default, could be disabled by `spark.python.worker.reuse = false`. Author: Davies Liu <davies.liu@gmail.com> Closes #2259 from davies/reuse-worker and squashes the following commits: f11f617 [Davies Liu] Merge branch 'master' into reuse-worker 3939f20 [Davies Liu] fix bug in serializer in mllib cf1c55e [Davies Liu] address comments 3133a60 [Davies Liu] fix accumulator with reused worker 760ab1f [Davies Liu] do not reuse worker if there are any exceptions 7abb224 [Davies Liu] refactor: sychronized with itself ac3206e [Davies Liu] renaming 8911f44 [Davies Liu] synchronized getWorkerBroadcasts() 6325fc1 [Davies Liu] bugfix: bid >= 0 e0131a2 [Davies Liu] fix name of config 583716e [Davies Liu] only reuse completed and not interrupted worker ace2917 [Davies Liu] kill python worker after timeout 6123d0f [Davies Liu] track broadcasts for each worker 8d2f08c [Davies Liu] reuse python worker
2014-09-13 19:22:04 -04:00
# wait for closing
try:
while sock.recv(1024):
pass
except Exception:
[SPARK-3030] [PySpark] Reuse Python worker Reuse Python worker to avoid the overhead of fork() Python process for each tasks. It also tracks the broadcasts for each worker, avoid sending repeated broadcasts. This can reduce the time for dummy task from 22ms to 13ms (-40%). It can help to reduce the latency for Spark Streaming. For a job with broadcast (43M after compress): ``` b = sc.broadcast(set(range(30000000))) print sc.parallelize(range(24000), 100).filter(lambda x: x in b.value).count() ``` It will finish in 281s without reused worker, and it will finish in 65s with reused worker(4 CPUs). After reusing the worker, it can save about 9 seconds for transfer and deserialize the broadcast for each tasks. It's enabled by default, could be disabled by `spark.python.worker.reuse = false`. Author: Davies Liu <davies.liu@gmail.com> Closes #2259 from davies/reuse-worker and squashes the following commits: f11f617 [Davies Liu] Merge branch 'master' into reuse-worker 3939f20 [Davies Liu] fix bug in serializer in mllib cf1c55e [Davies Liu] address comments 3133a60 [Davies Liu] fix accumulator with reused worker 760ab1f [Davies Liu] do not reuse worker if there are any exceptions 7abb224 [Davies Liu] refactor: sychronized with itself ac3206e [Davies Liu] renaming 8911f44 [Davies Liu] synchronized getWorkerBroadcasts() 6325fc1 [Davies Liu] bugfix: bid >= 0 e0131a2 [Davies Liu] fix name of config 583716e [Davies Liu] only reuse completed and not interrupted worker ace2917 [Davies Liu] kill python worker after timeout 6123d0f [Davies Liu] track broadcasts for each worker 8d2f08c [Davies Liu] reuse python worker
2014-09-13 19:22:04 -04:00
pass
break
gc.collect()
except:
traceback.print_exc()
os._exit(1)
else:
os._exit(0)
else:
sock.close()
finally:
shutdown(1)
2013-05-06 19:34:30 -04:00
if __name__ == '__main__':
manager()