diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 6f42ad36ba..97b6b257ea 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -160,6 +160,21 @@ def manager(): if pid == 0: # in child process listen_sock.close() + + # It should close the standard input in the child process so that + # Python native function executions stay intact. + # + # Note that if we just close the standard input (file descriptor 0), + # the lowest file descriptor (file descriptor 0) will be allocated, + # later when other file descriptors should happen to open. + # + # Therefore, here we redirects it to '/dev/null' by duplicating + # another file descriptor for '/dev/null' to the standard input (0). + # See SPARK-26175. + devnull = open(os.devnull, 'r') + os.dup2(devnull.fileno(), 0) + devnull.close() + try: # Acknowledge that the fork was successful outfile = sock.makefile(mode="wb") diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index 803d471c8c..1999311d09 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -616,6 +616,18 @@ class UDFTests(ReusedSQLTestCase): self.spark.range(1).select(f()).collect() + def test_worker_original_stdin_closed(self): + # Test if it closes the original standard input of worker inherited from the daemon, + # and replaces it with '/dev/null'. See SPARK-26175. + def task(iterator): + import sys + res = sys.stdin.read() + # Because the standard input is '/dev/null', it reaches to EOF. + assert res == '', "Expect read EOF from stdin." + return iterator + + self.sc.parallelize(range(1), 1).mapPartitions(task).count() + class UDFInitializationTests(unittest.TestCase): def tearDown(self):