[SPARK-26175][PYTHON] Redirect the standard input of the forked child to devnull in daemon
## What changes were proposed in this pull request?
PySpark worker daemon reads from stdin the worker PIDs to kill. 1bb60ab839/python/pyspark/daemon.py (L127)
However, the worker process is a forked process from the worker daemon process and we didn't close stdin on the child after fork. This means the child and user program can read stdin as well, which blocks daemon from receiving the PID to kill. This can cause issues because the task reaper might detect the task was not terminated and eventually kill the JVM.
This PR fix this by redirecting the standard input of the forked child to devnull.
## How was this patch tested?
Manually test.
In `pyspark`, run:
```
import subprocess
def task(_):
subprocess.check_output(["cat"])
sc.parallelize(range(1), 1).mapPartitions(task).count()
```
Before:
The job will get stuck and press Ctrl+C to exit the job but the python worker process do not exit.
After:
The job finish correctly. The "cat" print nothing (because the dummay stdin is "/dev/null").
The python worker process exit normally.
Please review https://spark.apache.org/contributing.html before opening a pull request.
Closes #25138 from WeichenXu123/SPARK-26175.
Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
parent
ee3c1c777d
commit
3b14088541
|
@ -160,6 +160,21 @@ def manager():
|
||||||
if pid == 0:
|
if pid == 0:
|
||||||
# in child process
|
# in child process
|
||||||
listen_sock.close()
|
listen_sock.close()
|
||||||
|
|
||||||
|
# It should close the standard input in the child process so that
|
||||||
|
# Python native function executions stay intact.
|
||||||
|
#
|
||||||
|
# Note that if we just close the standard input (file descriptor 0),
|
||||||
|
# the lowest file descriptor (file descriptor 0) will be allocated,
|
||||||
|
# later when other file descriptors should happen to open.
|
||||||
|
#
|
||||||
|
# Therefore, here we redirects it to '/dev/null' by duplicating
|
||||||
|
# another file descriptor for '/dev/null' to the standard input (0).
|
||||||
|
# See SPARK-26175.
|
||||||
|
devnull = open(os.devnull, 'r')
|
||||||
|
os.dup2(devnull.fileno(), 0)
|
||||||
|
devnull.close()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Acknowledge that the fork was successful
|
# Acknowledge that the fork was successful
|
||||||
outfile = sock.makefile(mode="wb")
|
outfile = sock.makefile(mode="wb")
|
||||||
|
|
|
@ -616,6 +616,18 @@ class UDFTests(ReusedSQLTestCase):
|
||||||
|
|
||||||
self.spark.range(1).select(f()).collect()
|
self.spark.range(1).select(f()).collect()
|
||||||
|
|
||||||
|
def test_worker_original_stdin_closed(self):
|
||||||
|
# Test if it closes the original standard input of worker inherited from the daemon,
|
||||||
|
# and replaces it with '/dev/null'. See SPARK-26175.
|
||||||
|
def task(iterator):
|
||||||
|
import sys
|
||||||
|
res = sys.stdin.read()
|
||||||
|
# Because the standard input is '/dev/null', it reaches to EOF.
|
||||||
|
assert res == '', "Expect read EOF from stdin."
|
||||||
|
return iterator
|
||||||
|
|
||||||
|
self.sc.parallelize(range(1), 1).mapPartitions(task).count()
|
||||||
|
|
||||||
|
|
||||||
class UDFInitializationTests(unittest.TestCase):
|
class UDFInitializationTests(unittest.TestCase):
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
|
|
Loading…
Reference in a new issue