2013-07-16 20:21:33 -04:00
|
|
|
#
|
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
|
|
# this work for additional information regarding copyright ownership.
|
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
# (the "License"); you may not use this file except in compliance with
|
|
|
|
# the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
#
|
|
|
|
|
2013-05-06 19:34:30 -04:00
|
|
|
import os
|
2013-07-01 02:20:14 -04:00
|
|
|
import signal
|
|
|
|
import socket
|
2013-05-06 19:34:30 -04:00
|
|
|
import sys
|
2013-07-01 02:20:14 -04:00
|
|
|
import traceback
|
2013-05-06 19:34:30 -04:00
|
|
|
import multiprocessing
|
2013-05-23 14:50:24 -04:00
|
|
|
from ctypes import c_bool
|
2013-05-06 19:34:30 -04:00
|
|
|
from errno import EINTR, ECHILD
|
2013-07-01 02:20:14 -04:00
|
|
|
from socket import AF_INET, SOCK_STREAM, SOMAXCONN
|
|
|
|
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
|
2013-05-06 19:34:30 -04:00
|
|
|
from pyspark.worker import main as worker_main
|
|
|
|
from pyspark.serializers import write_int
|
|
|
|
|
|
|
|
try:
|
|
|
|
POOLSIZE = multiprocessing.cpu_count()
|
|
|
|
except NotImplementedError:
|
|
|
|
POOLSIZE = 4
|
|
|
|
|
2013-05-23 14:50:24 -04:00
|
|
|
exit_flag = multiprocessing.Value(c_bool, False)
|
|
|
|
|
|
|
|
|
|
|
|
def should_exit():
|
|
|
|
global exit_flag
|
|
|
|
return exit_flag.value
|
2013-05-06 19:34:30 -04:00
|
|
|
|
|
|
|
|
2013-06-21 12:13:48 -04:00
|
|
|
def compute_real_exit_code(exit_code):
|
|
|
|
# SystemExit's code can be integer or string, but os._exit only accepts integers
|
|
|
|
import numbers
|
|
|
|
if isinstance(exit_code, numbers.Integral):
|
|
|
|
return exit_code
|
|
|
|
else:
|
|
|
|
return 1
|
|
|
|
|
|
|
|
|
2013-05-06 19:34:30 -04:00
|
|
|
def worker(listen_sock):
|
|
|
|
# Redirect stdout to stderr
|
|
|
|
os.dup2(2, 1)
|
2013-07-01 02:20:14 -04:00
|
|
|
sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1
|
2013-05-06 19:34:30 -04:00
|
|
|
|
|
|
|
# Manager sends SIGHUP to request termination of workers in the pool
|
2013-05-23 14:50:24 -04:00
|
|
|
def handle_sighup(*args):
|
|
|
|
assert should_exit()
|
2013-07-01 02:20:14 -04:00
|
|
|
signal.signal(SIGHUP, handle_sighup)
|
2013-05-06 19:34:30 -04:00
|
|
|
|
2013-05-23 14:50:24 -04:00
|
|
|
# Cleanup zombie children
|
|
|
|
def handle_sigchld(*args):
|
|
|
|
pid = status = None
|
|
|
|
try:
|
|
|
|
while (pid, status) != (0, 0):
|
|
|
|
pid, status = os.waitpid(0, os.WNOHANG)
|
|
|
|
except EnvironmentError as err:
|
|
|
|
if err.errno == EINTR:
|
|
|
|
# retry
|
|
|
|
handle_sigchld()
|
|
|
|
elif err.errno != ECHILD:
|
|
|
|
raise
|
2013-07-01 02:20:14 -04:00
|
|
|
signal.signal(SIGCHLD, handle_sigchld)
|
2013-05-23 14:50:24 -04:00
|
|
|
|
SPARK-1579: Clean up PythonRDD and avoid swallowing IOExceptions
This patch includes several cleanups to PythonRDD, focused around fixing [SPARK-1579](https://issues.apache.org/jira/browse/SPARK-1579) cleanly. Listed in order of approximate importance:
- The Python daemon waits for Spark to close the socket before exiting,
in order to avoid causing spurious IOExceptions in Spark's
`PythonRDD::WriterThread`.
- Removes the Python Monitor Thread, which polled for task cancellations
in order to kill the Python worker. Instead, we do this in the
onCompleteCallback, since this is guaranteed to be called during
cancellation.
- Adds a "completed" variable to TaskContext to avoid the issue noted in
[SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), where onCompleteCallbacks may be execution-order dependent.
Along with this, I removed the "context.interrupted = true" flag in
the onCompleteCallback.
- Extracts PythonRDD::WriterThread to its own class.
Since this patch provides an alternative solution to [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), I did test it with
```
sc.textFile("latlon.tsv").take(5)
```
many times without error.
Additionally, in order to test the unswallowed exceptions, I performed
```
sc.textFile("s3n://<big file>").count()
```
and cut my internet during execution. Prior to this patch, we got the "stdin writer exited early" message, which was unhelpful. Now, we get the SocketExceptions propagated through Spark to the user and get proper (though unsuccessful) task retries.
Author: Aaron Davidson <aaron@databricks.com>
Closes #640 from aarondav/pyspark-io and squashes the following commits:
b391ff8 [Aaron Davidson] Detect "clean socket shutdowns" and stop waiting on the socket
c0c49da [Aaron Davidson] SPARK-1579: Clean up PythonRDD and avoid swallowing IOExceptions
2014-05-07 12:48:31 -04:00
|
|
|
# Blocks until the socket is closed by draining the input stream
|
|
|
|
# until it raises an exception or returns EOF.
|
|
|
|
def waitSocketClose(sock):
|
|
|
|
try:
|
|
|
|
while True:
|
|
|
|
# Empty string is returned upon EOF (and only then).
|
|
|
|
if sock.recv(4096) == '':
|
|
|
|
return
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
|
2013-05-23 14:50:24 -04:00
|
|
|
# Handle clients
|
|
|
|
while not should_exit():
|
2013-05-06 19:34:30 -04:00
|
|
|
# Wait until a client arrives or we have to exit
|
|
|
|
sock = None
|
2013-05-23 14:50:24 -04:00
|
|
|
while not should_exit() and sock is None:
|
2013-05-06 19:34:30 -04:00
|
|
|
try:
|
|
|
|
sock, addr = listen_sock.accept()
|
|
|
|
except EnvironmentError as err:
|
|
|
|
if err.errno != EINTR:
|
|
|
|
raise
|
|
|
|
|
|
|
|
if sock is not None:
|
2013-05-23 14:50:24 -04:00
|
|
|
# Fork a child to handle the client.
|
|
|
|
# The client is handled in the child so that the manager
|
|
|
|
# never receives SIGCHLD unless a worker crashes.
|
|
|
|
if os.fork() == 0:
|
2013-05-06 19:34:30 -04:00
|
|
|
# Leave the worker pool
|
2013-07-01 02:20:14 -04:00
|
|
|
signal.signal(SIGHUP, SIG_DFL)
|
2013-05-06 19:34:30 -04:00
|
|
|
listen_sock.close()
|
2013-07-01 02:20:14 -04:00
|
|
|
# Read the socket using fdopen instead of socket.makefile() because the latter
|
|
|
|
# seems to be very slow; note that we need to dup() the file descriptor because
|
|
|
|
# otherwise writes also cause a seek that makes us miss data on the read side.
|
|
|
|
infile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
|
|
|
|
outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
|
2013-06-21 12:13:48 -04:00
|
|
|
exit_code = 0
|
|
|
|
try:
|
2013-07-01 02:20:14 -04:00
|
|
|
worker_main(infile, outfile)
|
2013-06-21 12:13:48 -04:00
|
|
|
except SystemExit as exc:
|
2013-07-01 02:20:14 -04:00
|
|
|
exit_code = exc.code
|
2013-06-21 12:13:48 -04:00
|
|
|
finally:
|
2013-07-01 02:20:14 -04:00
|
|
|
outfile.flush()
|
SPARK-1579: Clean up PythonRDD and avoid swallowing IOExceptions
This patch includes several cleanups to PythonRDD, focused around fixing [SPARK-1579](https://issues.apache.org/jira/browse/SPARK-1579) cleanly. Listed in order of approximate importance:
- The Python daemon waits for Spark to close the socket before exiting,
in order to avoid causing spurious IOExceptions in Spark's
`PythonRDD::WriterThread`.
- Removes the Python Monitor Thread, which polled for task cancellations
in order to kill the Python worker. Instead, we do this in the
onCompleteCallback, since this is guaranteed to be called during
cancellation.
- Adds a "completed" variable to TaskContext to avoid the issue noted in
[SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), where onCompleteCallbacks may be execution-order dependent.
Along with this, I removed the "context.interrupted = true" flag in
the onCompleteCallback.
- Extracts PythonRDD::WriterThread to its own class.
Since this patch provides an alternative solution to [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), I did test it with
```
sc.textFile("latlon.tsv").take(5)
```
many times without error.
Additionally, in order to test the unswallowed exceptions, I performed
```
sc.textFile("s3n://<big file>").count()
```
and cut my internet during execution. Prior to this patch, we got the "stdin writer exited early" message, which was unhelpful. Now, we get the SocketExceptions propagated through Spark to the user and get proper (though unsuccessful) task retries.
Author: Aaron Davidson <aaron@databricks.com>
Closes #640 from aarondav/pyspark-io and squashes the following commits:
b391ff8 [Aaron Davidson] Detect "clean socket shutdowns" and stop waiting on the socket
c0c49da [Aaron Davidson] SPARK-1579: Clean up PythonRDD and avoid swallowing IOExceptions
2014-05-07 12:48:31 -04:00
|
|
|
# The Scala side will close the socket upon task completion.
|
|
|
|
waitSocketClose(sock)
|
2013-07-01 02:20:14 -04:00
|
|
|
os._exit(compute_real_exit_code(exit_code))
|
2013-05-06 19:34:30 -04:00
|
|
|
else:
|
|
|
|
sock.close()
|
|
|
|
|
2013-05-23 14:50:24 -04:00
|
|
|
|
|
|
|
def launch_worker(listen_sock):
|
|
|
|
if os.fork() == 0:
|
|
|
|
try:
|
|
|
|
worker(listen_sock)
|
|
|
|
except Exception as err:
|
|
|
|
traceback.print_exc()
|
|
|
|
os._exit(1)
|
|
|
|
else:
|
|
|
|
assert should_exit()
|
|
|
|
os._exit(0)
|
2013-05-06 19:34:30 -04:00
|
|
|
|
|
|
|
|
|
|
|
def manager():
|
|
|
|
# Create a new process group to corral our children
|
|
|
|
os.setpgid(0, 0)
|
|
|
|
|
|
|
|
# Create a listening socket on the AF_INET loopback interface
|
2013-07-01 02:20:14 -04:00
|
|
|
listen_sock = socket.socket(AF_INET, SOCK_STREAM)
|
2013-05-06 19:34:30 -04:00
|
|
|
listen_sock.bind(('127.0.0.1', 0))
|
|
|
|
listen_sock.listen(max(1024, 2 * POOLSIZE, SOMAXCONN))
|
|
|
|
listen_host, listen_port = listen_sock.getsockname()
|
|
|
|
write_int(listen_port, sys.stdout)
|
|
|
|
|
|
|
|
# Launch initial worker pool
|
|
|
|
for idx in range(POOLSIZE):
|
2013-05-23 14:50:24 -04:00
|
|
|
launch_worker(listen_sock)
|
2013-05-06 19:34:30 -04:00
|
|
|
listen_sock.close()
|
|
|
|
|
|
|
|
def shutdown():
|
2013-05-23 14:50:24 -04:00
|
|
|
global exit_flag
|
|
|
|
exit_flag.value = True
|
2013-05-06 19:34:30 -04:00
|
|
|
|
|
|
|
# Gracefully exit on SIGTERM, don't die on SIGHUP
|
2013-07-01 02:20:14 -04:00
|
|
|
signal.signal(SIGTERM, lambda signum, frame: shutdown())
|
|
|
|
signal.signal(SIGHUP, SIG_IGN)
|
2013-05-06 19:34:30 -04:00
|
|
|
|
|
|
|
# Cleanup zombie children
|
2013-05-23 14:50:24 -04:00
|
|
|
def handle_sigchld(*args):
|
2013-05-06 19:34:30 -04:00
|
|
|
try:
|
|
|
|
pid, status = os.waitpid(0, os.WNOHANG)
|
2013-05-23 14:50:24 -04:00
|
|
|
if status != 0 and not should_exit():
|
2013-05-10 18:48:48 -04:00
|
|
|
raise RuntimeError("worker crashed: %s, %s" % (pid, status))
|
2013-05-06 19:34:30 -04:00
|
|
|
except EnvironmentError as err:
|
|
|
|
if err.errno not in (ECHILD, EINTR):
|
|
|
|
raise
|
2013-07-01 02:20:14 -04:00
|
|
|
signal.signal(SIGCHLD, handle_sigchld)
|
2013-05-06 19:34:30 -04:00
|
|
|
|
|
|
|
# Initialization complete
|
|
|
|
sys.stdout.close()
|
2013-05-10 18:48:48 -04:00
|
|
|
try:
|
2013-05-23 14:50:24 -04:00
|
|
|
while not should_exit():
|
2013-05-10 18:48:48 -04:00
|
|
|
try:
|
|
|
|
# Spark tells us to exit by closing stdin
|
|
|
|
if os.read(0, 512) == '':
|
|
|
|
shutdown()
|
|
|
|
except EnvironmentError as err:
|
|
|
|
if err.errno != EINTR:
|
|
|
|
shutdown()
|
|
|
|
raise
|
|
|
|
finally:
|
2013-07-01 02:20:14 -04:00
|
|
|
signal.signal(SIGTERM, SIG_DFL)
|
2013-05-23 14:50:24 -04:00
|
|
|
exit_flag.value = True
|
2013-05-10 18:48:48 -04:00
|
|
|
# Send SIGHUP to notify workers of shutdown
|
|
|
|
os.kill(0, SIGHUP)
|
2013-05-06 19:34:30 -04:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
manager()
|