spark-instrumented-optimizer/python/pyspark/tests/test_pin_thread.py
zero323 31a16fbb40 [SPARK-32714][PYTHON] Initial pyspark-stubs port
### What changes were proposed in this pull request?

This PR proposes migration of [`pyspark-stubs`](https://github.com/zero323/pyspark-stubs) into Spark codebase.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

Yes. This PR adds type annotations directly to Spark source.

This can impact interaction with development tools for users, which haven't used `pyspark-stubs`.

### How was this patch tested?

- [x] MyPy tests of the PySpark source
    ```
    mypy --no-incremental --config python/mypy.ini python/pyspark
    ```
- [x] MyPy tests of Spark examples
    ```
   MYPYPATH=python/ mypy --no-incremental --config python/mypy.ini examples/src/main/python/ml examples/src/main/python/sql examples/src/main/python/sql/streaming
    ```
- [x] Existing Flake8 linter

- [x] Existing unit tests

Tested against:

- `mypy==0.790+dev.e959952d9001e9713d329a2f9b196705b028f894`
- `mypy==0.782`

Closes #29591 from zero323/SPARK-32681.

Authored-by: zero323 <mszymkiewicz@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-09-24 14:15:36 +09:00

177 lines
6.1 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import time
import threading
import unittest
from pyspark import SparkContext, SparkConf, InheritableThread
class PinThreadTests(unittest.TestCase):
# These tests are in a separate class because it uses
# 'PYSPARK_PIN_THREAD' environment variable to test thread pin feature.
@classmethod
def setUpClass(cls):
cls.old_pin_thread = os.environ.get("PYSPARK_PIN_THREAD")
os.environ["PYSPARK_PIN_THREAD"] = "true"
cls.sc = SparkContext('local[4]', cls.__name__, conf=SparkConf())
@classmethod
def tearDownClass(cls):
cls.sc.stop()
if cls.old_pin_thread is not None:
os.environ["PYSPARK_PIN_THREAD"] = cls.old_pin_thread
else:
del os.environ["PYSPARK_PIN_THREAD"]
def test_pinned_thread(self):
threads = []
exceptions = []
property_name = "test_property_%s" % PinThreadTests.__name__
jvm_thread_ids = []
for i in range(10):
def test_local_property():
jvm_thread_id = self.sc._jvm.java.lang.Thread.currentThread().getId()
jvm_thread_ids.append(jvm_thread_id)
# If a property is set in this thread, later it should get the same property
# within this thread.
self.sc.setLocalProperty(property_name, str(i))
# 5 threads, 1 second sleep. 5 threads without a sleep.
time.sleep(i % 2)
try:
assert self.sc.getLocalProperty(property_name) == str(i)
# Each command might create a thread in multi-threading mode in Py4J.
# This assert makes sure that the created thread is being reused.
assert jvm_thread_id == self.sc._jvm.java.lang.Thread.currentThread().getId()
except Exception as e:
exceptions.append(e)
threads.append(threading.Thread(target=test_local_property))
for t in threads:
t.start()
for t in threads:
t.join()
for e in exceptions:
raise e
# Created JVM threads should be 10 because Python thread are 10.
assert len(set(jvm_thread_ids)) == 10
def test_multiple_group_jobs(self):
# SPARK-22340 Add a mode to pin Python thread into JVM's
group_a = "job_ids_to_cancel"
group_b = "job_ids_to_run"
threads = []
thread_ids = range(4)
thread_ids_to_cancel = [i for i in thread_ids if i % 2 == 0]
thread_ids_to_run = [i for i in thread_ids if i % 2 != 0]
# A list which records whether job is cancelled.
# The index of the array is the thread index which job run in.
is_job_cancelled = [False for _ in thread_ids]
def run_job(job_group, index):
"""
Executes a job with the group ``job_group``. Each job waits for 3 seconds
and then exits.
"""
try:
self.sc.setJobGroup(job_group, "test rdd collect with setting job group")
self.sc.parallelize([15]).map(lambda x: time.sleep(x)).collect()
is_job_cancelled[index] = False
except Exception:
# Assume that exception means job cancellation.
is_job_cancelled[index] = True
# Test if job succeeded when not cancelled.
run_job(group_a, 0)
self.assertFalse(is_job_cancelled[0])
# Run jobs
for i in thread_ids_to_cancel:
t = threading.Thread(target=run_job, args=(group_a, i))
t.start()
threads.append(t)
for i in thread_ids_to_run:
t = threading.Thread(target=run_job, args=(group_b, i))
t.start()
threads.append(t)
# Wait to make sure all jobs are executed.
time.sleep(3)
# And then, cancel one job group.
self.sc.cancelJobGroup(group_a)
# Wait until all threads launching jobs are finished.
for t in threads:
t.join()
for i in thread_ids_to_cancel:
self.assertTrue(
is_job_cancelled[i],
"Thread {i}: Job in group A was not cancelled.".format(i=i))
for i in thread_ids_to_run:
self.assertFalse(
is_job_cancelled[i],
"Thread {i}: Job in group B did not succeeded.".format(i=i))
def test_inheritable_local_property(self):
self.sc.setLocalProperty("a", "hi")
expected = []
def get_inner_local_prop():
expected.append(self.sc.getLocalProperty("b"))
def get_outer_local_prop():
expected.append(self.sc.getLocalProperty("a"))
self.sc.setLocalProperty("b", "hello")
t2 = InheritableThread(target=get_inner_local_prop)
t2.start()
t2.join()
t1 = InheritableThread(target=get_outer_local_prop)
t1.start()
t1.join()
self.assertEqual(self.sc.getLocalProperty("b"), None)
self.assertEqual(expected, ["hi", "hello"])
if __name__ == "__main__":
import unittest
from pyspark.tests.test_pin_thread import * # noqa: F401
try:
import xmlrunner # type: ignore[import]
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)