spark-instrumented-optimizer/python/pyspark/tests/test_taskcontext.py
wuyi cbad616d4c [SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone
## What changes were proposed in this pull request?

In this PR, we implements a complete process of GPU-aware resources scheduling
in Standalone. The whole process looks like: Worker sets up isolated resources
when it starts up and registers to master along with its resources. And, Master
picks up usable workers according to driver/executor's resource requirements to
launch driver/executor on them. Then, Worker launches the driver/executor after
preparing resources file, which is created under driver/executor's working directory,
with specified resource addresses(told by master). When driver/executor finished,
their resources could be recycled to worker. Finally, if a worker stops, it
should always release its resources firstly.

For the case of Workers and Drivers in **client** mode run on the same host, we introduce
a config option named `spark.resources.coordinate.enable`(default true) to indicate
whether Spark should coordinate resources for user. If `spark.resources.coordinate.enable=false`, user should be responsible for configuring different resources for Workers and Drivers when use resourcesFile or discovery script. If true, Spark would help user to assign different  resources for Workers and Drivers.

The solution for Spark to coordinate resources among Workers and Drivers is:

Generally, use a shared file named *____allocated_resources____.json* to sync allocated
resources info among Workers and Drivers on the same host.

After a Worker or Driver found all resources using the configured resourcesFile and/or
discovery script during launching, it should filter out available resources by excluding resources already allocated in *____allocated_resources____.json* and acquire resources from available resources according to its own requirement. After that, it should write its allocated resources along with its process id (pid) into *____allocated_resources____.json*.  Pid (proposed by tgravescs) here used to check whether the allocated resources are still valid in case of Worker or Driver crashes and doesn't release resources properly. And when a Worker or Driver finished, normally, it would always clean up its own allocated resources in *____allocated_resources____.json*.

Note that we'll always get a file lock before any access to file *____allocated_resources____.json*
and release the lock finally.

Futhermore, we appended resources info in `WorkerSchedulerStateResponse` to work
around master change behaviour in HA mode.

## How was this patch tested?

Added unit tests in WorkerSuite, MasterSuite, SparkContextSuite.

Manually tested with client/cluster mode (e.g. multiple workers) in a single node Standalone.

Closes #25047 from Ngone51/SPARK-27371.

Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
2019-08-09 07:49:03 -05:00

227 lines
9.1 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import random
import stat
import sys
import tempfile
import time
import unittest
from pyspark import SparkConf, SparkContext, TaskContext, BarrierTaskContext
from pyspark.testing.utils import PySparkTestCase, SPARK_HOME
class TaskContextTests(PySparkTestCase):
def setUp(self):
self._old_sys_path = list(sys.path)
class_name = self.__class__.__name__
# Allow retries even though they are normally disabled in local mode
self.sc = SparkContext('local[4, 2]', class_name)
def test_stage_id(self):
"""Test the stage ids are available and incrementing as expected."""
rdd = self.sc.parallelize(range(10))
stage1 = rdd.map(lambda x: TaskContext.get().stageId()).take(1)[0]
stage2 = rdd.map(lambda x: TaskContext.get().stageId()).take(1)[0]
# Test using the constructor directly rather than the get()
stage3 = rdd.map(lambda x: TaskContext().stageId()).take(1)[0]
self.assertEqual(stage1 + 1, stage2)
self.assertEqual(stage1 + 2, stage3)
self.assertEqual(stage2 + 1, stage3)
def test_resources(self):
"""Test the resources are empty by default."""
rdd = self.sc.parallelize(range(10))
resources1 = rdd.map(lambda x: TaskContext.get().resources()).take(1)[0]
# Test using the constructor directly rather than the get()
resources2 = rdd.map(lambda x: TaskContext().resources()).take(1)[0]
self.assertEqual(len(resources1), 0)
self.assertEqual(len(resources2), 0)
def test_partition_id(self):
"""Test the partition id."""
rdd1 = self.sc.parallelize(range(10), 1)
rdd2 = self.sc.parallelize(range(10), 2)
pids1 = rdd1.map(lambda x: TaskContext.get().partitionId()).collect()
pids2 = rdd2.map(lambda x: TaskContext.get().partitionId()).collect()
self.assertEqual(0, pids1[0])
self.assertEqual(0, pids1[9])
self.assertEqual(0, pids2[0])
self.assertEqual(1, pids2[9])
def test_attempt_number(self):
"""Verify the attempt numbers are correctly reported."""
rdd = self.sc.parallelize(range(10))
# Verify a simple job with no failures
attempt_numbers = rdd.map(lambda x: TaskContext.get().attemptNumber()).collect()
map(lambda attempt: self.assertEqual(0, attempt), attempt_numbers)
def fail_on_first(x):
"""Fail on the first attempt so we get a positive attempt number"""
tc = TaskContext.get()
attempt_number = tc.attemptNumber()
partition_id = tc.partitionId()
attempt_id = tc.taskAttemptId()
if attempt_number == 0 and partition_id == 0:
raise Exception("Failing on first attempt")
else:
return [x, partition_id, attempt_number, attempt_id]
result = rdd.map(fail_on_first).collect()
# We should re-submit the first partition to it but other partitions should be attempt 0
self.assertEqual([0, 0, 1], result[0][0:3])
self.assertEqual([9, 3, 0], result[9][0:3])
first_partition = filter(lambda x: x[1] == 0, result)
map(lambda x: self.assertEqual(1, x[2]), first_partition)
other_partitions = filter(lambda x: x[1] != 0, result)
map(lambda x: self.assertEqual(0, x[2]), other_partitions)
# The task attempt id should be different
self.assertTrue(result[0][3] != result[9][3])
def test_tc_on_driver(self):
"""Verify that getting the TaskContext on the driver returns None."""
tc = TaskContext.get()
self.assertTrue(tc is None)
def test_get_local_property(self):
"""Verify that local properties set on the driver are available in TaskContext."""
key = "testkey"
value = "testvalue"
self.sc.setLocalProperty(key, value)
try:
rdd = self.sc.parallelize(range(1), 1)
prop1 = rdd.map(lambda _: TaskContext.get().getLocalProperty(key)).collect()[0]
self.assertEqual(prop1, value)
prop2 = rdd.map(lambda _: TaskContext.get().getLocalProperty("otherkey")).collect()[0]
self.assertTrue(prop2 is None)
finally:
self.sc.setLocalProperty(key, None)
def test_barrier(self):
"""
Verify that BarrierTaskContext.barrier() performs global sync among all barrier tasks
within a stage.
"""
rdd = self.sc.parallelize(range(10), 4)
def f(iterator):
yield sum(iterator)
def context_barrier(x):
tc = BarrierTaskContext.get()
time.sleep(random.randint(1, 10))
tc.barrier()
return time.time()
times = rdd.barrier().mapPartitions(f).map(context_barrier).collect()
self.assertTrue(max(times) - min(times) < 1)
def test_barrier_infos(self):
"""
Verify that BarrierTaskContext.getTaskInfos() returns a list of all task infos in the
barrier stage.
"""
rdd = self.sc.parallelize(range(10), 4)
def f(iterator):
yield sum(iterator)
taskInfos = rdd.barrier().mapPartitions(f).map(lambda x: BarrierTaskContext.get()
.getTaskInfos()).collect()
self.assertTrue(len(taskInfos) == 4)
self.assertTrue(len(taskInfos[0]) == 4)
class TaskContextTestsWithWorkerReuse(unittest.TestCase):
def setUp(self):
class_name = self.__class__.__name__
conf = SparkConf().set("spark.python.worker.reuse", "true")
self.sc = SparkContext('local[2]', class_name, conf=conf)
def test_barrier_with_python_worker_reuse(self):
"""
Regression test for SPARK-25921: verify that BarrierTaskContext.barrier() with
reused python worker.
"""
# start a normal job first to start all workers and get all worker pids
worker_pids = self.sc.parallelize(range(2), 2).map(lambda x: os.getpid()).collect()
# the worker will reuse in this barrier job
rdd = self.sc.parallelize(range(10), 2)
def f(iterator):
yield sum(iterator)
def context_barrier(x):
tc = BarrierTaskContext.get()
time.sleep(random.randint(1, 10))
tc.barrier()
return (time.time(), os.getpid())
result = rdd.barrier().mapPartitions(f).map(context_barrier).collect()
times = list(map(lambda x: x[0], result))
pids = list(map(lambda x: x[1], result))
# check both barrier and worker reuse effect
self.assertTrue(max(times) - min(times) < 1)
for pid in pids:
self.assertTrue(pid in worker_pids)
def tearDown(self):
self.sc.stop()
class TaskContextTestsWithResources(unittest.TestCase):
def setUp(self):
class_name = self.__class__.__name__
self.tempFile = tempfile.NamedTemporaryFile(delete=False)
self.tempFile.write(b'echo {\\"name\\": \\"gpu\\", \\"addresses\\": [\\"0\\"]}')
self.tempFile.close()
os.chmod(self.tempFile.name, stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP |
stat.S_IROTH | stat.S_IXOTH)
conf = SparkConf().set("spark.test.home", SPARK_HOME)
conf = conf.set("spark.worker.resource.gpu.discoveryScript", self.tempFile.name)
conf = conf.set("spark.worker.resource.gpu.amount", 1)
conf = conf.set("spark.task.resource.gpu.amount", "1")
conf = conf.set("spark.executor.resource.gpu.amount", "1")
self.sc = SparkContext('local-cluster[2,1,1024]', class_name, conf=conf)
def test_resources(self):
"""Test the resources are available."""
rdd = self.sc.parallelize(range(10))
resources = rdd.map(lambda x: TaskContext.get().resources()).take(1)[0]
self.assertEqual(len(resources), 1)
self.assertTrue('gpu' in resources)
self.assertEqual(resources['gpu'].name, 'gpu')
self.assertEqual(resources['gpu'].addresses, ['0'])
def tearDown(self):
os.unlink(self.tempFile.name)
self.sc.stop()
if __name__ == "__main__":
import unittest
from pyspark.tests.test_taskcontext import *
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)