[SPARK-2898] [PySpark] fix bugs in deamon.py

1. do not use signal handler for SIGCHILD, it's easy to cause deadlock
2. handle EINTR during accept()
3. pass errno into JVM
4. handle EAGAIN during fork()

Now, it can pass 50k tasks tests in 180 seconds.

Author: Davies Liu <davies.liu@gmail.com>

Closes #1842 from davies/qa and squashes the following commits:

f0ea451 [Davies Liu] fix lint
03a2e8c [Davies Liu] cleanup dead children every seconds
32cb829 [Davies Liu] fix lint
0cd0817 [Davies Liu] fix bugs in deamon.py
This commit is contained in:
Davies Liu 2014-08-10 13:00:38 -07:00 committed by Josh Rosen
parent 1d03a26a48
commit 28dcbb531a
2 changed files with 49 additions and 33 deletions

View file

@ -68,7 +68,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
val socket = new Socket(daemonHost, daemonPort)
val pid = new DataInputStream(socket.getInputStream).readInt()
if (pid < 0) {
throw new IllegalStateException("Python daemon failed to launch worker")
throw new IllegalStateException("Python daemon failed to launch worker with code " + pid)
}
daemonWorkers.put(socket, pid)
socket

View file

@ -22,7 +22,8 @@ import select
import socket
import sys
import traceback
from errno import EINTR, ECHILD
import time
from errno import EINTR, ECHILD, EAGAIN
from socket import AF_INET, SOCK_STREAM, SOMAXCONN
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
from pyspark.worker import main as worker_main
@ -80,6 +81,17 @@ def worker(sock):
os._exit(compute_real_exit_code(exit_code))
# Cleanup zombie children
def cleanup_dead_children():
try:
while True:
pid, _ = os.waitpid(0, os.WNOHANG)
if not pid:
break
except:
pass
def manager():
# Create a new process group to corral our children
os.setpgid(0, 0)
@ -102,29 +114,21 @@ def manager():
signal.signal(SIGTERM, handle_sigterm) # Gracefully exit on SIGTERM
signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP
# Cleanup zombie children
def handle_sigchld(*args):
try:
pid, status = os.waitpid(0, os.WNOHANG)
if status != 0:
msg = "worker %s crashed abruptly with exit status %s" % (pid, status)
print >> sys.stderr, msg
except EnvironmentError as err:
if err.errno not in (ECHILD, EINTR):
raise
signal.signal(SIGCHLD, handle_sigchld)
# Initialization complete
sys.stdout.close()
try:
while True:
try:
ready_fds = select.select([0, listen_sock], [], [])[0]
ready_fds = select.select([0, listen_sock], [], [], 1)[0]
except select.error as ex:
if ex[0] == EINTR:
continue
else:
raise
# cleanup in signal handler will cause deadlock
cleanup_dead_children()
if 0 in ready_fds:
try:
worker_pid = read_int(sys.stdin)
@ -137,29 +141,41 @@ def manager():
pass # process already died
if listen_sock in ready_fds:
sock, addr = listen_sock.accept()
try:
sock, _ = listen_sock.accept()
except OSError as e:
if e.errno == EINTR:
continue
raise
# Launch a worker process
try:
pid = os.fork()
if pid == 0:
listen_sock.close()
try:
worker(sock)
except:
traceback.print_exc()
os._exit(1)
else:
os._exit(0)
else:
sock.close()
except OSError as e:
print >> sys.stderr, "Daemon failed to fork PySpark worker: %s" % e
outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
write_int(-1, outfile) # Signal that the fork failed
outfile.flush()
outfile.close()
if e.errno in (EAGAIN, EINTR):
time.sleep(1)
pid = os.fork() # error here will shutdown daemon
else:
outfile = sock.makefile('w')
write_int(e.errno, outfile) # Signal that the fork failed
outfile.flush()
outfile.close()
sock.close()
continue
if pid == 0:
# in child process
listen_sock.close()
try:
worker(sock)
except:
traceback.print_exc()
os._exit(1)
else:
os._exit(0)
else:
sock.close()
finally:
shutdown(1)