spark-instrumented-optimizer/python/pyspark/tests/test_taskcontext.py

318 lines
12 KiB
Python
Raw Normal View History

2018-11-14 23:30:52 -05:00
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
2018-11-14 23:30:52 -05:00
import random
[SPARK-29148][CORE] Add stage level scheduling dynamic allocation and scheduler backend changes ### What changes were proposed in this pull request? This is another PR for stage level scheduling. In particular this adds changes to the dynamic allocation manager and the scheduler backend to be able to track what executors are needed per ResourceProfile. Note the api is still private to Spark until the entire feature gets in, so this functionality will be there but only usable by tests for profiles other then the DefaultProfile. The main changes here are simply tracking things on a ResourceProfile basis as well as sending the executor requests to the scheduler backend for all ResourceProfiles. I introduce a ResourceProfileManager in this PR that will track all the actual ResourceProfile objects so that we can keep them all in a single place and just pass around and use in datastructures the resource profile id. The resource profile id can be used with the ResourceProfileManager to get the actual ResourceProfile contents. There are various places in the code that use executor "slots" for things. The ResourceProfile adds functionality to keep that calculation in it. This logic is more complex then it should due to standalone mode and mesos coarse grained not setting the executor cores config. They default to all cores on the worker, so calculating slots is harder there. This PR keeps the functionality to make the cores the limiting resource because the scheduler still uses that for "slots" for a few things. This PR does also add the resource profile id to the Stage and stage info classes to be able to test things easier. That full set of changes will come with the scheduler PR that will be after this one. The PR stops at the scheduler backend pieces for the cluster manager and the real YARN support hasn't been added in this PR, that again will be in a separate PR, so this has a few of the API changes up to the cluster manager and then just uses the default profile requests to continue. The code for the entire feature is here for reference: https://github.com/apache/spark/pull/27053/files although it needs to be upmerged again as well. ### Why are the changes needed? Needed for stage level scheduling feature. ### Does this PR introduce any user-facing change? No user facing api changes added here. ### How was this patch tested? Lots of unit tests and manually testing. I tested on yarn, k8s, standalone, local modes. Ran both failure and success cases. Closes #27313 from tgravescs/SPARK-29148. Authored-by: Thomas Graves <tgraves@nvidia.com> Signed-off-by: Thomas Graves <tgraves@apache.org>
2020-02-12 17:45:42 -05:00
import shutil
import stat
2018-11-14 23:30:52 -05:00
import sys
import tempfile
2018-11-14 23:30:52 -05:00
import time
import unittest
2018-11-14 23:30:52 -05:00
from pyspark import SparkConf, SparkContext, TaskContext, BarrierTaskContext
[SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone ## What changes were proposed in this pull request? In this PR, we implements a complete process of GPU-aware resources scheduling in Standalone. The whole process looks like: Worker sets up isolated resources when it starts up and registers to master along with its resources. And, Master picks up usable workers according to driver/executor's resource requirements to launch driver/executor on them. Then, Worker launches the driver/executor after preparing resources file, which is created under driver/executor's working directory, with specified resource addresses(told by master). When driver/executor finished, their resources could be recycled to worker. Finally, if a worker stops, it should always release its resources firstly. For the case of Workers and Drivers in **client** mode run on the same host, we introduce a config option named `spark.resources.coordinate.enable`(default true) to indicate whether Spark should coordinate resources for user. If `spark.resources.coordinate.enable=false`, user should be responsible for configuring different resources for Workers and Drivers when use resourcesFile or discovery script. If true, Spark would help user to assign different resources for Workers and Drivers. The solution for Spark to coordinate resources among Workers and Drivers is: Generally, use a shared file named *____allocated_resources____.json* to sync allocated resources info among Workers and Drivers on the same host. After a Worker or Driver found all resources using the configured resourcesFile and/or discovery script during launching, it should filter out available resources by excluding resources already allocated in *____allocated_resources____.json* and acquire resources from available resources according to its own requirement. After that, it should write its allocated resources along with its process id (pid) into *____allocated_resources____.json*. Pid (proposed by tgravescs) here used to check whether the allocated resources are still valid in case of Worker or Driver crashes and doesn't release resources properly. And when a Worker or Driver finished, normally, it would always clean up its own allocated resources in *____allocated_resources____.json*. Note that we'll always get a file lock before any access to file *____allocated_resources____.json* and release the lock finally. Futhermore, we appended resources info in `WorkerSchedulerStateResponse` to work around master change behaviour in HA mode. ## How was this patch tested? Added unit tests in WorkerSuite, MasterSuite, SparkContextSuite. Manually tested with client/cluster mode (e.g. multiple workers) in a single node Standalone. Closes #25047 from Ngone51/SPARK-27371. Authored-by: wuyi <ngone_5451@163.com> Signed-off-by: Thomas Graves <tgraves@apache.org>
2019-08-09 08:49:03 -04:00
from pyspark.testing.utils import PySparkTestCase, SPARK_HOME
2018-11-14 23:30:52 -05:00
[SPARK-29582][PYSPARK] Support `TaskContext.get()` in a barrier task from Python side ### What changes were proposed in this pull request? Add support of `TaskContext.get()` in a barrier task from Python side, this makes it easier to migrate legacy user code to barrier execution mode. ### Why are the changes needed? In Spark Core, there is a `TaskContext` object which is a singleton. We set a task context instance which can be TaskContext or BarrierTaskContext before the task function startup, and unset it to none after the function end. So we can both get TaskContext and BarrierTaskContext with the object. However we can only get the BarrierTaskContext with `BarrierTaskContext`, we will get `None` if we get it by `TaskContext.get` in a barrier stage. This is useful when people switch from normal code to barrier code, and only need a little update. ### Does this PR introduce any user-facing change? Yes. Previously: ```python def func(iterator): task_context = TaskContext.get() . # this could be None. barrier_task_context = BarrierTaskContext.get() # get the BarrierTaskContext instance ... rdd.barrier().mapPartitions(func) ``` Proposed: ```python def func(iterator): task_context = TaskContext.get() . # this could also get the BarrierTaskContext instance which is same as barrier_task_context barrier_task_context = BarrierTaskContext.get() # get the BarrierTaskContext instance ... rdd.barrier().mapPartitions(func) ``` ### How was this patch tested? New UT tests. Closes #26239 from ConeyLiu/barrier_task_context. Authored-by: Xianyang Liu <xianyang.liu@intel.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-10-31 00:10:44 -04:00
if sys.version_info[0] >= 3:
xrange = range
2018-11-14 23:30:52 -05:00
class TaskContextTests(PySparkTestCase):
def setUp(self):
self._old_sys_path = list(sys.path)
class_name = self.__class__.__name__
# Allow retries even though they are normally disabled in local mode
self.sc = SparkContext('local[4, 2]', class_name)
def test_stage_id(self):
"""Test the stage ids are available and incrementing as expected."""
rdd = self.sc.parallelize(range(10))
stage1 = rdd.map(lambda x: TaskContext.get().stageId()).take(1)[0]
stage2 = rdd.map(lambda x: TaskContext.get().stageId()).take(1)[0]
# Test using the constructor directly rather than the get()
stage3 = rdd.map(lambda x: TaskContext().stageId()).take(1)[0]
self.assertEqual(stage1 + 1, stage2)
self.assertEqual(stage1 + 2, stage3)
self.assertEqual(stage2 + 1, stage3)
def test_resources(self):
"""Test the resources are empty by default."""
rdd = self.sc.parallelize(range(10))
resources1 = rdd.map(lambda x: TaskContext.get().resources()).take(1)[0]
# Test using the constructor directly rather than the get()
resources2 = rdd.map(lambda x: TaskContext().resources()).take(1)[0]
self.assertEqual(len(resources1), 0)
self.assertEqual(len(resources2), 0)
2018-11-14 23:30:52 -05:00
def test_partition_id(self):
"""Test the partition id."""
rdd1 = self.sc.parallelize(range(10), 1)
rdd2 = self.sc.parallelize(range(10), 2)
pids1 = rdd1.map(lambda x: TaskContext.get().partitionId()).collect()
pids2 = rdd2.map(lambda x: TaskContext.get().partitionId()).collect()
self.assertEqual(0, pids1[0])
self.assertEqual(0, pids1[9])
self.assertEqual(0, pids2[0])
self.assertEqual(1, pids2[9])
def test_attempt_number(self):
"""Verify the attempt numbers are correctly reported."""
rdd = self.sc.parallelize(range(10))
# Verify a simple job with no failures
attempt_numbers = rdd.map(lambda x: TaskContext.get().attemptNumber()).collect()
map(lambda attempt: self.assertEqual(0, attempt), attempt_numbers)
def fail_on_first(x):
"""Fail on the first attempt so we get a positive attempt number"""
tc = TaskContext.get()
attempt_number = tc.attemptNumber()
partition_id = tc.partitionId()
attempt_id = tc.taskAttemptId()
if attempt_number == 0 and partition_id == 0:
raise Exception("Failing on first attempt")
else:
return [x, partition_id, attempt_number, attempt_id]
result = rdd.map(fail_on_first).collect()
# We should re-submit the first partition to it but other partitions should be attempt 0
self.assertEqual([0, 0, 1], result[0][0:3])
self.assertEqual([9, 3, 0], result[9][0:3])
first_partition = filter(lambda x: x[1] == 0, result)
map(lambda x: self.assertEqual(1, x[2]), first_partition)
other_partitions = filter(lambda x: x[1] != 0, result)
map(lambda x: self.assertEqual(0, x[2]), other_partitions)
# The task attempt id should be different
self.assertTrue(result[0][3] != result[9][3])
def test_tc_on_driver(self):
"""Verify that getting the TaskContext on the driver returns None."""
tc = TaskContext.get()
self.assertTrue(tc is None)
def test_get_local_property(self):
"""Verify that local properties set on the driver are available in TaskContext."""
key = "testkey"
value = "testvalue"
self.sc.setLocalProperty(key, value)
try:
rdd = self.sc.parallelize(range(1), 1)
prop1 = rdd.map(lambda _: TaskContext.get().getLocalProperty(key)).collect()[0]
self.assertEqual(prop1, value)
prop2 = rdd.map(lambda _: TaskContext.get().getLocalProperty("otherkey")).collect()[0]
self.assertTrue(prop2 is None)
finally:
self.sc.setLocalProperty(key, None)
def test_barrier(self):
"""
Verify that BarrierTaskContext.barrier() performs global sync among all barrier tasks
within a stage.
"""
rdd = self.sc.parallelize(range(10), 4)
def f(iterator):
yield sum(iterator)
def context_barrier(x):
tc = BarrierTaskContext.get()
time.sleep(random.randint(1, 10))
tc.barrier()
return time.time()
times = rdd.barrier().mapPartitions(f).map(context_barrier).collect()
self.assertTrue(max(times) - min(times) < 1)
def test_barrier_infos(self):
"""
Verify that BarrierTaskContext.getTaskInfos() returns a list of all task infos in the
barrier stage.
"""
rdd = self.sc.parallelize(range(10), 4)
def f(iterator):
yield sum(iterator)
taskInfos = rdd.barrier().mapPartitions(f).map(lambda x: BarrierTaskContext.get()
.getTaskInfos()).collect()
self.assertTrue(len(taskInfos) == 4)
self.assertTrue(len(taskInfos[0]) == 4)
[SPARK-29582][PYSPARK] Support `TaskContext.get()` in a barrier task from Python side ### What changes were proposed in this pull request? Add support of `TaskContext.get()` in a barrier task from Python side, this makes it easier to migrate legacy user code to barrier execution mode. ### Why are the changes needed? In Spark Core, there is a `TaskContext` object which is a singleton. We set a task context instance which can be TaskContext or BarrierTaskContext before the task function startup, and unset it to none after the function end. So we can both get TaskContext and BarrierTaskContext with the object. However we can only get the BarrierTaskContext with `BarrierTaskContext`, we will get `None` if we get it by `TaskContext.get` in a barrier stage. This is useful when people switch from normal code to barrier code, and only need a little update. ### Does this PR introduce any user-facing change? Yes. Previously: ```python def func(iterator): task_context = TaskContext.get() . # this could be None. barrier_task_context = BarrierTaskContext.get() # get the BarrierTaskContext instance ... rdd.barrier().mapPartitions(func) ``` Proposed: ```python def func(iterator): task_context = TaskContext.get() . # this could also get the BarrierTaskContext instance which is same as barrier_task_context barrier_task_context = BarrierTaskContext.get() # get the BarrierTaskContext instance ... rdd.barrier().mapPartitions(func) ``` ### How was this patch tested? New UT tests. Closes #26239 from ConeyLiu/barrier_task_context. Authored-by: Xianyang Liu <xianyang.liu@intel.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-10-31 00:10:44 -04:00
def test_context_get(self):
"""
Verify that TaskContext.get() works both in or not in a barrier stage.
"""
rdd = self.sc.parallelize(range(10), 4)
def f(iterator):
taskContext = TaskContext.get()
if isinstance(taskContext, BarrierTaskContext):
yield taskContext.partitionId() + 1
elif isinstance(taskContext, TaskContext):
yield taskContext.partitionId() + 2
else:
yield -1
# for normal stage
result1 = rdd.mapPartitions(f).collect()
self.assertTrue(result1 == [2, 3, 4, 5])
# for barrier stage
result2 = rdd.barrier().mapPartitions(f).collect()
self.assertTrue(result2 == [1, 2, 3, 4])
def test_barrier_context_get(self):
"""
Verify that BarrierTaskContext.get() should only works in a barrier stage.
"""
rdd = self.sc.parallelize(range(10), 4)
def f(iterator):
try:
taskContext = BarrierTaskContext.get()
except Exception:
yield -1
else:
yield taskContext.partitionId()
# for normal stage
result1 = rdd.mapPartitions(f).collect()
self.assertTrue(result1 == [-1, -1, -1, -1])
# for barrier stage
result2 = rdd.barrier().mapPartitions(f).collect()
self.assertTrue(result2 == [0, 1, 2, 3])
2018-11-14 23:30:52 -05:00
class TaskContextTestsWithWorkerReuse(unittest.TestCase):
def setUp(self):
class_name = self.__class__.__name__
conf = SparkConf().set("spark.python.worker.reuse", "true")
self.sc = SparkContext('local[2]', class_name, conf=conf)
def test_barrier_with_python_worker_reuse(self):
"""
Regression test for SPARK-25921: verify that BarrierTaskContext.barrier() with
reused python worker.
"""
# start a normal job first to start all workers and get all worker pids
worker_pids = self.sc.parallelize(range(2), 2).map(lambda x: os.getpid()).collect()
# the worker will reuse in this barrier job
rdd = self.sc.parallelize(range(10), 2)
def f(iterator):
yield sum(iterator)
def context_barrier(x):
tc = BarrierTaskContext.get()
time.sleep(random.randint(1, 10))
tc.barrier()
return (time.time(), os.getpid())
result = rdd.barrier().mapPartitions(f).map(context_barrier).collect()
times = list(map(lambda x: x[0], result))
pids = list(map(lambda x: x[1], result))
# check both barrier and worker reuse effect
self.assertTrue(max(times) - min(times) < 1)
for pid in pids:
self.assertTrue(pid in worker_pids)
[SPARK-29582][PYSPARK] Support `TaskContext.get()` in a barrier task from Python side ### What changes were proposed in this pull request? Add support of `TaskContext.get()` in a barrier task from Python side, this makes it easier to migrate legacy user code to barrier execution mode. ### Why are the changes needed? In Spark Core, there is a `TaskContext` object which is a singleton. We set a task context instance which can be TaskContext or BarrierTaskContext before the task function startup, and unset it to none after the function end. So we can both get TaskContext and BarrierTaskContext with the object. However we can only get the BarrierTaskContext with `BarrierTaskContext`, we will get `None` if we get it by `TaskContext.get` in a barrier stage. This is useful when people switch from normal code to barrier code, and only need a little update. ### Does this PR introduce any user-facing change? Yes. Previously: ```python def func(iterator): task_context = TaskContext.get() . # this could be None. barrier_task_context = BarrierTaskContext.get() # get the BarrierTaskContext instance ... rdd.barrier().mapPartitions(func) ``` Proposed: ```python def func(iterator): task_context = TaskContext.get() . # this could also get the BarrierTaskContext instance which is same as barrier_task_context barrier_task_context = BarrierTaskContext.get() # get the BarrierTaskContext instance ... rdd.barrier().mapPartitions(func) ``` ### How was this patch tested? New UT tests. Closes #26239 from ConeyLiu/barrier_task_context. Authored-by: Xianyang Liu <xianyang.liu@intel.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-10-31 00:10:44 -04:00
def test_task_context_correct_with_python_worker_reuse(self):
"""Verify the task context correct when reused python worker"""
# start a normal job first to start all workers and get all worker pids
worker_pids = self.sc.parallelize(xrange(2), 2).map(lambda x: os.getpid()).collect()
# the worker will reuse in this barrier job
rdd = self.sc.parallelize(xrange(10), 2)
def context(iterator):
tp = TaskContext.get().partitionId()
try:
bp = BarrierTaskContext.get().partitionId()
except Exception:
bp = -1
yield (tp, bp, os.getpid())
# normal stage after normal stage
normal_result = rdd.mapPartitions(context).collect()
tps, bps, pids = zip(*normal_result)
print(tps)
self.assertTrue(tps == (0, 1))
self.assertTrue(bps == (-1, -1))
for pid in pids:
self.assertTrue(pid in worker_pids)
# barrier stage after normal stage
barrier_result = rdd.barrier().mapPartitions(context).collect()
tps, bps, pids = zip(*barrier_result)
self.assertTrue(tps == (0, 1))
self.assertTrue(bps == (0, 1))
for pid in pids:
self.assertTrue(pid in worker_pids)
# normal stage after barrier stage
normal_result2 = rdd.mapPartitions(context).collect()
tps, bps, pids = zip(*normal_result2)
self.assertTrue(tps == (0, 1))
self.assertTrue(bps == (-1, -1))
for pid in pids:
self.assertTrue(pid in worker_pids)
def tearDown(self):
self.sc.stop()
class TaskContextTestsWithResources(unittest.TestCase):
def setUp(self):
class_name = self.__class__.__name__
self.tempFile = tempfile.NamedTemporaryFile(delete=False)
self.tempFile.write(b'echo {\\"name\\": \\"gpu\\", \\"addresses\\": [\\"0\\"]}')
self.tempFile.close()
[SPARK-29148][CORE] Add stage level scheduling dynamic allocation and scheduler backend changes ### What changes were proposed in this pull request? This is another PR for stage level scheduling. In particular this adds changes to the dynamic allocation manager and the scheduler backend to be able to track what executors are needed per ResourceProfile. Note the api is still private to Spark until the entire feature gets in, so this functionality will be there but only usable by tests for profiles other then the DefaultProfile. The main changes here are simply tracking things on a ResourceProfile basis as well as sending the executor requests to the scheduler backend for all ResourceProfiles. I introduce a ResourceProfileManager in this PR that will track all the actual ResourceProfile objects so that we can keep them all in a single place and just pass around and use in datastructures the resource profile id. The resource profile id can be used with the ResourceProfileManager to get the actual ResourceProfile contents. There are various places in the code that use executor "slots" for things. The ResourceProfile adds functionality to keep that calculation in it. This logic is more complex then it should due to standalone mode and mesos coarse grained not setting the executor cores config. They default to all cores on the worker, so calculating slots is harder there. This PR keeps the functionality to make the cores the limiting resource because the scheduler still uses that for "slots" for a few things. This PR does also add the resource profile id to the Stage and stage info classes to be able to test things easier. That full set of changes will come with the scheduler PR that will be after this one. The PR stops at the scheduler backend pieces for the cluster manager and the real YARN support hasn't been added in this PR, that again will be in a separate PR, so this has a few of the API changes up to the cluster manager and then just uses the default profile requests to continue. The code for the entire feature is here for reference: https://github.com/apache/spark/pull/27053/files although it needs to be upmerged again as well. ### Why are the changes needed? Needed for stage level scheduling feature. ### Does this PR introduce any user-facing change? No user facing api changes added here. ### How was this patch tested? Lots of unit tests and manually testing. I tested on yarn, k8s, standalone, local modes. Ran both failure and success cases. Closes #27313 from tgravescs/SPARK-29148. Authored-by: Thomas Graves <tgraves@nvidia.com> Signed-off-by: Thomas Graves <tgraves@apache.org>
2020-02-12 17:45:42 -05:00
# create temporary directory for Worker resources coordination
self.tempdir = tempfile.NamedTemporaryFile(delete=False)
os.unlink(self.tempdir.name)
os.chmod(self.tempFile.name, stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP |
stat.S_IROTH | stat.S_IXOTH)
[SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone ## What changes were proposed in this pull request? In this PR, we implements a complete process of GPU-aware resources scheduling in Standalone. The whole process looks like: Worker sets up isolated resources when it starts up and registers to master along with its resources. And, Master picks up usable workers according to driver/executor's resource requirements to launch driver/executor on them. Then, Worker launches the driver/executor after preparing resources file, which is created under driver/executor's working directory, with specified resource addresses(told by master). When driver/executor finished, their resources could be recycled to worker. Finally, if a worker stops, it should always release its resources firstly. For the case of Workers and Drivers in **client** mode run on the same host, we introduce a config option named `spark.resources.coordinate.enable`(default true) to indicate whether Spark should coordinate resources for user. If `spark.resources.coordinate.enable=false`, user should be responsible for configuring different resources for Workers and Drivers when use resourcesFile or discovery script. If true, Spark would help user to assign different resources for Workers and Drivers. The solution for Spark to coordinate resources among Workers and Drivers is: Generally, use a shared file named *____allocated_resources____.json* to sync allocated resources info among Workers and Drivers on the same host. After a Worker or Driver found all resources using the configured resourcesFile and/or discovery script during launching, it should filter out available resources by excluding resources already allocated in *____allocated_resources____.json* and acquire resources from available resources according to its own requirement. After that, it should write its allocated resources along with its process id (pid) into *____allocated_resources____.json*. Pid (proposed by tgravescs) here used to check whether the allocated resources are still valid in case of Worker or Driver crashes and doesn't release resources properly. And when a Worker or Driver finished, normally, it would always clean up its own allocated resources in *____allocated_resources____.json*. Note that we'll always get a file lock before any access to file *____allocated_resources____.json* and release the lock finally. Futhermore, we appended resources info in `WorkerSchedulerStateResponse` to work around master change behaviour in HA mode. ## How was this patch tested? Added unit tests in WorkerSuite, MasterSuite, SparkContextSuite. Manually tested with client/cluster mode (e.g. multiple workers) in a single node Standalone. Closes #25047 from Ngone51/SPARK-27371. Authored-by: wuyi <ngone_5451@163.com> Signed-off-by: Thomas Graves <tgraves@apache.org>
2019-08-09 08:49:03 -04:00
conf = SparkConf().set("spark.test.home", SPARK_HOME)
[SPARK-29148][CORE] Add stage level scheduling dynamic allocation and scheduler backend changes ### What changes were proposed in this pull request? This is another PR for stage level scheduling. In particular this adds changes to the dynamic allocation manager and the scheduler backend to be able to track what executors are needed per ResourceProfile. Note the api is still private to Spark until the entire feature gets in, so this functionality will be there but only usable by tests for profiles other then the DefaultProfile. The main changes here are simply tracking things on a ResourceProfile basis as well as sending the executor requests to the scheduler backend for all ResourceProfiles. I introduce a ResourceProfileManager in this PR that will track all the actual ResourceProfile objects so that we can keep them all in a single place and just pass around and use in datastructures the resource profile id. The resource profile id can be used with the ResourceProfileManager to get the actual ResourceProfile contents. There are various places in the code that use executor "slots" for things. The ResourceProfile adds functionality to keep that calculation in it. This logic is more complex then it should due to standalone mode and mesos coarse grained not setting the executor cores config. They default to all cores on the worker, so calculating slots is harder there. This PR keeps the functionality to make the cores the limiting resource because the scheduler still uses that for "slots" for a few things. This PR does also add the resource profile id to the Stage and stage info classes to be able to test things easier. That full set of changes will come with the scheduler PR that will be after this one. The PR stops at the scheduler backend pieces for the cluster manager and the real YARN support hasn't been added in this PR, that again will be in a separate PR, so this has a few of the API changes up to the cluster manager and then just uses the default profile requests to continue. The code for the entire feature is here for reference: https://github.com/apache/spark/pull/27053/files although it needs to be upmerged again as well. ### Why are the changes needed? Needed for stage level scheduling feature. ### Does this PR introduce any user-facing change? No user facing api changes added here. ### How was this patch tested? Lots of unit tests and manually testing. I tested on yarn, k8s, standalone, local modes. Ran both failure and success cases. Closes #27313 from tgravescs/SPARK-29148. Authored-by: Thomas Graves <tgraves@nvidia.com> Signed-off-by: Thomas Graves <tgraves@apache.org>
2020-02-12 17:45:42 -05:00
conf = conf.set("spark.resources.dir", self.tempdir.name)
[SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone ## What changes were proposed in this pull request? In this PR, we implements a complete process of GPU-aware resources scheduling in Standalone. The whole process looks like: Worker sets up isolated resources when it starts up and registers to master along with its resources. And, Master picks up usable workers according to driver/executor's resource requirements to launch driver/executor on them. Then, Worker launches the driver/executor after preparing resources file, which is created under driver/executor's working directory, with specified resource addresses(told by master). When driver/executor finished, their resources could be recycled to worker. Finally, if a worker stops, it should always release its resources firstly. For the case of Workers and Drivers in **client** mode run on the same host, we introduce a config option named `spark.resources.coordinate.enable`(default true) to indicate whether Spark should coordinate resources for user. If `spark.resources.coordinate.enable=false`, user should be responsible for configuring different resources for Workers and Drivers when use resourcesFile or discovery script. If true, Spark would help user to assign different resources for Workers and Drivers. The solution for Spark to coordinate resources among Workers and Drivers is: Generally, use a shared file named *____allocated_resources____.json* to sync allocated resources info among Workers and Drivers on the same host. After a Worker or Driver found all resources using the configured resourcesFile and/or discovery script during launching, it should filter out available resources by excluding resources already allocated in *____allocated_resources____.json* and acquire resources from available resources according to its own requirement. After that, it should write its allocated resources along with its process id (pid) into *____allocated_resources____.json*. Pid (proposed by tgravescs) here used to check whether the allocated resources are still valid in case of Worker or Driver crashes and doesn't release resources properly. And when a Worker or Driver finished, normally, it would always clean up its own allocated resources in *____allocated_resources____.json*. Note that we'll always get a file lock before any access to file *____allocated_resources____.json* and release the lock finally. Futhermore, we appended resources info in `WorkerSchedulerStateResponse` to work around master change behaviour in HA mode. ## How was this patch tested? Added unit tests in WorkerSuite, MasterSuite, SparkContextSuite. Manually tested with client/cluster mode (e.g. multiple workers) in a single node Standalone. Closes #25047 from Ngone51/SPARK-27371. Authored-by: wuyi <ngone_5451@163.com> Signed-off-by: Thomas Graves <tgraves@apache.org>
2019-08-09 08:49:03 -04:00
conf = conf.set("spark.worker.resource.gpu.discoveryScript", self.tempFile.name)
conf = conf.set("spark.worker.resource.gpu.amount", 1)
conf = conf.set("spark.task.resource.gpu.amount", "1")
conf = conf.set("spark.executor.resource.gpu.amount", "1")
self.sc = SparkContext('local-cluster[2,1,1024]', class_name, conf=conf)
def test_resources(self):
"""Test the resources are available."""
rdd = self.sc.parallelize(range(10))
resources = rdd.map(lambda x: TaskContext.get().resources()).take(1)[0]
self.assertEqual(len(resources), 1)
self.assertTrue('gpu' in resources)
self.assertEqual(resources['gpu'].name, 'gpu')
self.assertEqual(resources['gpu'].addresses, ['0'])
def tearDown(self):
os.unlink(self.tempFile.name)
[SPARK-29148][CORE] Add stage level scheduling dynamic allocation and scheduler backend changes ### What changes were proposed in this pull request? This is another PR for stage level scheduling. In particular this adds changes to the dynamic allocation manager and the scheduler backend to be able to track what executors are needed per ResourceProfile. Note the api is still private to Spark until the entire feature gets in, so this functionality will be there but only usable by tests for profiles other then the DefaultProfile. The main changes here are simply tracking things on a ResourceProfile basis as well as sending the executor requests to the scheduler backend for all ResourceProfiles. I introduce a ResourceProfileManager in this PR that will track all the actual ResourceProfile objects so that we can keep them all in a single place and just pass around and use in datastructures the resource profile id. The resource profile id can be used with the ResourceProfileManager to get the actual ResourceProfile contents. There are various places in the code that use executor "slots" for things. The ResourceProfile adds functionality to keep that calculation in it. This logic is more complex then it should due to standalone mode and mesos coarse grained not setting the executor cores config. They default to all cores on the worker, so calculating slots is harder there. This PR keeps the functionality to make the cores the limiting resource because the scheduler still uses that for "slots" for a few things. This PR does also add the resource profile id to the Stage and stage info classes to be able to test things easier. That full set of changes will come with the scheduler PR that will be after this one. The PR stops at the scheduler backend pieces for the cluster manager and the real YARN support hasn't been added in this PR, that again will be in a separate PR, so this has a few of the API changes up to the cluster manager and then just uses the default profile requests to continue. The code for the entire feature is here for reference: https://github.com/apache/spark/pull/27053/files although it needs to be upmerged again as well. ### Why are the changes needed? Needed for stage level scheduling feature. ### Does this PR introduce any user-facing change? No user facing api changes added here. ### How was this patch tested? Lots of unit tests and manually testing. I tested on yarn, k8s, standalone, local modes. Ran both failure and success cases. Closes #27313 from tgravescs/SPARK-29148. Authored-by: Thomas Graves <tgraves@nvidia.com> Signed-off-by: Thomas Graves <tgraves@apache.org>
2020-02-12 17:45:42 -05:00
shutil.rmtree(self.tempdir.name)
self.sc.stop()
2018-11-14 23:30:52 -05:00
if __name__ == "__main__":
import unittest
from pyspark.tests.test_taskcontext import *
try:
import xmlrunner
[SPARK-28130][PYTHON] Print pretty messages for skipped tests when xmlrunner is available in PySpark ## What changes were proposed in this pull request? Currently, pretty skipped message added by https://github.com/apache/spark/commit/f7435bec6a9348cfbbe26b13c230c08545d16067 mechanism seems not working when xmlrunner is installed apparently. This PR fixes two things: 1. When `xmlrunner` is installed, seems `xmlrunner` does not respect `vervosity` level in unittests (default is level 1). So the output looks as below ``` Running tests... ---------------------------------------------------------------------- SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS ---------------------------------------------------------------------- ``` So it is not caught by our message detection mechanism. 2. If we manually set the `vervocity` level to `xmlrunner`, it prints messages as below: ``` test_mixed_udf (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... SKIP (0.000s) test_mixed_udf_and_sql (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... SKIP (0.000s) ... ``` This is different in our Jenkins machine: ``` test_createDataFrame_column_name_encoding (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.23.2 must be installed; however, it was not found.' test_createDataFrame_does_not_modify_input (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.23.2 must be installed; however, it was not found.' ... ``` Note that last `SKIP` is different. This PR fixes the regular expression to catch `SKIP` case as well. ## How was this patch tested? Manually tested. **Before:** ``` Starting test(python2.7): pyspark.... Finished test(python2.7): pyspark.... (0s) ... Tests passed in 562 seconds ======================================================================== ... ``` **After:** ``` Starting test(python2.7): pyspark.... Finished test(python2.7): pyspark.... (48s) ... 93 tests were skipped ... Tests passed in 560 seconds Skipped tests pyspark.... with python2.7: pyspark...(...) ... SKIP (0.000s) ... ======================================================================== ... ``` Closes #24927 from HyukjinKwon/SPARK-28130. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-23 20:58:17 -04:00
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
2018-11-14 23:30:52 -05:00
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)