spark-instrumented-optimizer/python/pyspark/tests/test_daemon.py
zero323 31a16fbb40 [SPARK-32714][PYTHON] Initial pyspark-stubs port
### What changes were proposed in this pull request?

This PR proposes migration of [`pyspark-stubs`](https://github.com/zero323/pyspark-stubs) into Spark codebase.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

Yes. This PR adds type annotations directly to Spark source.

This can impact interaction with development tools for users, which haven't used `pyspark-stubs`.

### How was this patch tested?

- [x] MyPy tests of the PySpark source
    ```
    mypy --no-incremental --config python/mypy.ini python/pyspark
    ```
- [x] MyPy tests of Spark examples
    ```
   MYPYPATH=python/ mypy --no-incremental --config python/mypy.ini examples/src/main/python/ml examples/src/main/python/sql examples/src/main/python/sql/streaming
    ```
- [x] Existing Flake8 linter

- [x] Existing unit tests

Tested against:

- `mypy==0.790+dev.e959952d9001e9713d329a2f9b196705b028f894`
- `mypy==0.782`

Closes #29591 from zero323/SPARK-32681.

Authored-by: zero323 <mszymkiewicz@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-24 14:15:36 +09:00

84 lines
2.9 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import sys
import time
import unittest
from pyspark.serializers import read_int
class DaemonTests(unittest.TestCase):
def connect(self, port):
from socket import socket, AF_INET, SOCK_STREAM
sock = socket(AF_INET, SOCK_STREAM)
sock.connect(('127.0.0.1', port))
# send a split index of -1 to shutdown the worker
sock.send(b"\xFF\xFF\xFF\xFF")
sock.close()
return True
def do_termination_test(self, terminator):
from subprocess import Popen, PIPE
from errno import ECONNREFUSED
# start daemon
daemon_path = os.path.join(os.path.dirname(__file__), "..", "daemon.py")
python_exec = sys.executable or os.environ.get("PYSPARK_PYTHON")
daemon = Popen([python_exec, daemon_path], stdin=PIPE, stdout=PIPE)
# read the port number
port = read_int(daemon.stdout)
# daemon should accept connections
self.assertTrue(self.connect(port))
# wait worker process spawned from daemon exit.
time.sleep(1)
# request shutdown
terminator(daemon)
time.sleep(1)
# daemon should no longer accept connections
try:
self.connect(port)
except EnvironmentError as exception:
self.assertEqual(exception.errno, ECONNREFUSED)
else:
self.fail("Expected EnvironmentError to be raised")
def test_termination_stdin(self):
"""Ensure that daemon and workers terminate when stdin is closed."""
self.do_termination_test(lambda daemon: daemon.stdin.close())
def test_termination_sigterm(self):
"""Ensure that daemon and workers terminate on SIGTERM."""
from signal import SIGTERM
self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
if __name__ == "__main__":
from pyspark.tests.test_daemon import * # noqa: F401
try:
import xmlrunner # type: ignore[import]
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)