157 lines
5.4 KiB
Python
157 lines
5.4 KiB
Python
|
#
|
||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||
|
# contributor license agreements. See the NOTICE file distributed with
|
||
|
# this work for additional information regarding copyright ownership.
|
||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||
|
# (the "License"); you may not use this file except in compliance with
|
||
|
# the License. You may obtain a copy of the License at
|
||
|
#
|
||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
#
|
||
|
# Unless required by applicable law or agreed to in writing, software
|
||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
# See the License for the specific language governing permissions and
|
||
|
# limitations under the License.
|
||
|
#
|
||
|
import os
|
||
|
import time
|
||
|
import random
|
||
|
import threading
|
||
|
import unittest
|
||
|
|
||
|
from pyspark import SparkContext, SparkConf
|
||
|
|
||
|
|
||
|
class PinThreadTests(unittest.TestCase):
|
||
|
# These tests are in a separate class because it uses
|
||
|
# 'PYSPARK_PIN_THREAD' environment variable to test thread pin feature.
|
||
|
|
||
|
@classmethod
|
||
|
def setUpClass(cls):
|
||
|
cls.old_pin_thread = os.environ.get("PYSPARK_PIN_THREAD")
|
||
|
os.environ["PYSPARK_PIN_THREAD"] = "true"
|
||
|
cls.sc = SparkContext('local[4]', cls.__name__, conf=SparkConf())
|
||
|
|
||
|
@classmethod
|
||
|
def tearDownClass(cls):
|
||
|
cls.sc.stop()
|
||
|
if cls.old_pin_thread is not None:
|
||
|
os.environ["PYSPARK_PIN_THREAD"] = cls.old_pin_thread
|
||
|
else:
|
||
|
del os.environ["PYSPARK_PIN_THREAD"]
|
||
|
|
||
|
def test_pinned_thread(self):
|
||
|
threads = []
|
||
|
exceptions = []
|
||
|
property_name = "test_property_%s" % PinThreadTests.__name__
|
||
|
jvm_thread_ids = []
|
||
|
|
||
|
for i in range(10):
|
||
|
def test_local_property():
|
||
|
jvm_thread_id = self.sc._jvm.java.lang.Thread.currentThread().getId()
|
||
|
jvm_thread_ids.append(jvm_thread_id)
|
||
|
|
||
|
# If a property is set in this thread, later it should get the same property
|
||
|
# within this thread.
|
||
|
self.sc.setLocalProperty(property_name, str(i))
|
||
|
|
||
|
# 5 threads, 1 second sleep. 5 threads without a sleep.
|
||
|
time.sleep(i % 2)
|
||
|
|
||
|
try:
|
||
|
assert self.sc.getLocalProperty(property_name) == str(i)
|
||
|
|
||
|
# Each command might create a thread in multi-threading mode in Py4J.
|
||
|
# This assert makes sure that the created thread is being reused.
|
||
|
assert jvm_thread_id == self.sc._jvm.java.lang.Thread.currentThread().getId()
|
||
|
except Exception as e:
|
||
|
exceptions.append(e)
|
||
|
threads.append(threading.Thread(target=test_local_property))
|
||
|
|
||
|
for t in threads:
|
||
|
t.start()
|
||
|
|
||
|
for t in threads:
|
||
|
t.join()
|
||
|
|
||
|
for e in exceptions:
|
||
|
raise e
|
||
|
|
||
|
# Created JVM threads should be 10 because Python thread are 10.
|
||
|
assert len(set(jvm_thread_ids)) == 10
|
||
|
|
||
|
def test_multiple_group_jobs(self):
|
||
|
# SPARK-22340 Add a mode to pin Python thread into JVM's
|
||
|
|
||
|
group_a = "job_ids_to_cancel"
|
||
|
group_b = "job_ids_to_run"
|
||
|
|
||
|
threads = []
|
||
|
thread_ids = range(4)
|
||
|
thread_ids_to_cancel = [i for i in thread_ids if i % 2 == 0]
|
||
|
thread_ids_to_run = [i for i in thread_ids if i % 2 != 0]
|
||
|
|
||
|
# A list which records whether job is cancelled.
|
||
|
# The index of the array is the thread index which job run in.
|
||
|
is_job_cancelled = [False for _ in thread_ids]
|
||
|
|
||
|
def run_job(job_group, index):
|
||
|
"""
|
||
|
Executes a job with the group ``job_group``. Each job waits for 3 seconds
|
||
|
and then exits.
|
||
|
"""
|
||
|
try:
|
||
|
self.sc.setJobGroup(job_group, "test rdd collect with setting job group")
|
||
|
self.sc.parallelize([15]).map(lambda x: time.sleep(x)).collect()
|
||
|
is_job_cancelled[index] = False
|
||
|
except Exception:
|
||
|
# Assume that exception means job cancellation.
|
||
|
is_job_cancelled[index] = True
|
||
|
|
||
|
# Test if job succeeded when not cancelled.
|
||
|
run_job(group_a, 0)
|
||
|
self.assertFalse(is_job_cancelled[0])
|
||
|
|
||
|
# Run jobs
|
||
|
for i in thread_ids_to_cancel:
|
||
|
t = threading.Thread(target=run_job, args=(group_a, i))
|
||
|
t.start()
|
||
|
threads.append(t)
|
||
|
|
||
|
for i in thread_ids_to_run:
|
||
|
t = threading.Thread(target=run_job, args=(group_b, i))
|
||
|
t.start()
|
||
|
threads.append(t)
|
||
|
|
||
|
# Wait to make sure all jobs are executed.
|
||
|
time.sleep(3)
|
||
|
# And then, cancel one job group.
|
||
|
self.sc.cancelJobGroup(group_a)
|
||
|
|
||
|
# Wait until all threads launching jobs are finished.
|
||
|
for t in threads:
|
||
|
t.join()
|
||
|
|
||
|
for i in thread_ids_to_cancel:
|
||
|
self.assertTrue(
|
||
|
is_job_cancelled[i],
|
||
|
"Thread {i}: Job in group A was not cancelled.".format(i=i))
|
||
|
|
||
|
for i in thread_ids_to_run:
|
||
|
self.assertFalse(
|
||
|
is_job_cancelled[i],
|
||
|
"Thread {i}: Job in group B did not succeeded.".format(i=i))
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
import unittest
|
||
|
from pyspark.tests.test_pin_thread import *
|
||
|
|
||
|
try:
|
||
|
import xmlrunner
|
||
|
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
|
||
|
except ImportError:
|
||
|
testRunner = None
|
||
|
unittest.main(testRunner=testRunner, verbosity=2)
|