[SPARK-31748][PYTHON] Document resource module in PySpark doc and rename/move classes
### What changes were proposed in this pull request?
This PR is kind of a followup for SPARK-29641 and SPARK-28234. This PR proposes:
1.. Document the new `pyspark.resource` module introduced at 95aec091e4
, in PySpark API docs.
2.. Move classes into fewer and simpler modules
Before:
```
pyspark
├── resource
│ ├── executorrequests.py
│ │ ├── class ExecutorResourceRequest
│ │ └── class ExecutorResourceRequests
│ ├── taskrequests.py
│ │ ├── class TaskResourceRequest
│ │ └── class TaskResourceRequests
│ ├── resourceprofilebuilder.py
│ │ └── class ResourceProfileBuilder
│ ├── resourceprofile.py
│ │ └── class ResourceProfile
└── resourceinformation
└── class ResourceInformation
```
After:
```
pyspark
└── resource
├── requests.py
│ ├── class ExecutorResourceRequest
│ ├── class ExecutorResourceRequests
│ ├── class TaskResourceRequest
│ └── class TaskResourceRequests
├── profile.py
│ ├── class ResourceProfileBuilder
│ └── class ResourceProfile
└── information.py
└── class ResourceInformation
```
3.. Minor docstring fix e.g.:
```diff
- param name the name of the resource
- param addresses an array of strings describing the addresses of the resource
+ :param name: the name of the resource
+ :param addresses: an array of strings describing the addresses of the resource
+
+ .. versionadded:: 3.0.0
```
### Why are the changes needed?
To document APIs, and move Python modules to fewer and simpler modules.
### Does this PR introduce _any_ user-facing change?
No, the changes are in unreleased branches.
### How was this patch tested?
Manually tested via:
```bash
cd python
./run-tests --python-executables=python3 --modules=pyspark-core
./run-tests --python-executables=python3 --modules=pyspark-resource
```
Closes #28569 from HyukjinKwon/SPARK-28234-SPARK-29641-followup.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
parent
1f29f1ba58
commit
6fb22aa42d
|
@ -16,6 +16,7 @@ Contents:
|
|||
pyspark.streaming
|
||||
pyspark.ml
|
||||
pyspark.mllib
|
||||
pyspark.resource
|
||||
|
||||
|
||||
Core classes:
|
||||
|
|
11
python/docs/pyspark.resource.rst
Normal file
11
python/docs/pyspark.resource.rst
Normal file
|
@ -0,0 +1,11 @@
|
|||
pyspark.resource module
|
||||
=======================
|
||||
|
||||
Module Contents
|
||||
---------------
|
||||
|
||||
.. automodule:: pyspark.resource
|
||||
:members:
|
||||
:undoc-members:
|
||||
:inherited-members:
|
||||
|
|
@ -11,6 +11,7 @@ Subpackages
|
|||
pyspark.streaming
|
||||
pyspark.ml
|
||||
pyspark.mllib
|
||||
pyspark.resource
|
||||
|
||||
Contents
|
||||
--------
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
pyspark.sql module
|
||||
==================
|
||||
|
||||
Module Context
|
||||
--------------
|
||||
Module Contents
|
||||
---------------
|
||||
|
||||
.. automodule:: pyspark.sql
|
||||
:members:
|
||||
|
|
|
@ -54,7 +54,7 @@ from pyspark.files import SparkFiles
|
|||
from pyspark.storagelevel import StorageLevel
|
||||
from pyspark.accumulators import Accumulator, AccumulatorParam
|
||||
from pyspark.broadcast import Broadcast
|
||||
from pyspark.resourceinformation import ResourceInformation
|
||||
from pyspark.resource.information import ResourceInformation
|
||||
from pyspark.serializers import MarshalSerializer, PickleSerializer
|
||||
from pyspark.status import *
|
||||
from pyspark.taskcontext import TaskContext, BarrierTaskContext, BarrierTaskInfo
|
||||
|
|
|
@ -35,7 +35,7 @@ from pyspark.java_gateway import launch_gateway, local_connect_and_auth
|
|||
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
|
||||
PairDeserializer, AutoBatchedSerializer, NoOpSerializer, ChunkedStream
|
||||
from pyspark.storagelevel import StorageLevel
|
||||
from pyspark.resourceinformation import ResourceInformation
|
||||
from pyspark.resource.information import ResourceInformation
|
||||
from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
|
||||
from pyspark.traceback_utils import CallSite, first_spark_call
|
||||
from pyspark.status import StatusTracker
|
||||
|
|
|
@ -47,9 +47,8 @@ from pyspark.join import python_join, python_left_outer_join, \
|
|||
from pyspark.statcounter import StatCounter
|
||||
from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler
|
||||
from pyspark.storagelevel import StorageLevel
|
||||
from pyspark.resource.executorrequests import ExecutorResourceRequests
|
||||
from pyspark.resource.resourceprofile import ResourceProfile
|
||||
from pyspark.resource.taskrequests import TaskResourceRequests
|
||||
from pyspark.resource.requests import ExecutorResourceRequests, TaskResourceRequests
|
||||
from pyspark.resource.profile import ResourceProfile
|
||||
from pyspark.resultiterable import ResultIterable
|
||||
from pyspark.shuffle import Aggregator, ExternalMerger, \
|
||||
get_used_memory, ExternalSorter, ExternalGroupBy
|
||||
|
|
|
@ -18,12 +18,13 @@
|
|||
"""
|
||||
APIs to let users manipulate resource requirements.
|
||||
"""
|
||||
from pyspark.resource.executorrequests import ExecutorResourceRequest, ExecutorResourceRequests
|
||||
from pyspark.resource.taskrequests import TaskResourceRequest, TaskResourceRequests
|
||||
from pyspark.resource.resourceprofilebuilder import ResourceProfileBuilder
|
||||
from pyspark.resource.resourceprofile import ResourceProfile
|
||||
from pyspark.resource.information import ResourceInformation
|
||||
from pyspark.resource.requests import TaskResourceRequest, TaskResourceRequests, \
|
||||
ExecutorResourceRequest, ExecutorResourceRequests
|
||||
from pyspark.resource.profile import ResourceProfile, ResourceProfileBuilder
|
||||
|
||||
__all__ = [
|
||||
"TaskResourceRequest", "TaskResourceRequests", "ExecutorResourceRequest",
|
||||
"ExecutorResourceRequests", "ResourceProfile", "ResourceProfileBuilder",
|
||||
"ExecutorResourceRequests", "ResourceProfile", "ResourceInformation",
|
||||
"ResourceProfileBuilder",
|
||||
]
|
||||
|
|
|
@ -26,8 +26,10 @@ class ResourceInformation(object):
|
|||
|
||||
One example is GPUs, where the addresses would be the indices of the GPUs
|
||||
|
||||
@param name the name of the resource
|
||||
@param addresses an array of strings describing the addresses of the resource
|
||||
:param name: the name of the resource
|
||||
:param addresses: an array of strings describing the addresses of the resource
|
||||
|
||||
.. versionadded:: 3.0.0
|
||||
"""
|
||||
|
||||
def __init__(self, name, addresses):
|
|
@ -15,10 +15,61 @@
|
|||
# limitations under the License.
|
||||
#
|
||||
|
||||
from pyspark.resource.executorrequests import ExecutorResourceRequest,\
|
||||
ExecutorResourceRequests
|
||||
from pyspark.resource.resourceprofile import ResourceProfile
|
||||
from pyspark.resource.taskrequests import TaskResourceRequest, TaskResourceRequests
|
||||
from pyspark.resource.requests import TaskResourceRequest, TaskResourceRequests, \
|
||||
ExecutorResourceRequests, ExecutorResourceRequest
|
||||
|
||||
|
||||
class ResourceProfile(object):
|
||||
|
||||
"""
|
||||
.. note:: Evolving
|
||||
|
||||
Resource profile to associate with an RDD. A :class:`pyspark.resource.ResourceProfile`
|
||||
allows the user to specify executor and task requirements for an RDD that will get
|
||||
applied during a stage. This allows the user to change the resource requirements between
|
||||
stages. This is meant to be immutable so user cannot change it after building.
|
||||
|
||||
.. versionadded:: 3.1.0
|
||||
"""
|
||||
|
||||
def __init__(self, _java_resource_profile=None, _exec_req={}, _task_req={}):
|
||||
if _java_resource_profile is not None:
|
||||
self._java_resource_profile = _java_resource_profile
|
||||
else:
|
||||
self._java_resource_profile = None
|
||||
self._executor_resource_requests = _exec_req
|
||||
self._task_resource_requests = _task_req
|
||||
|
||||
@property
|
||||
def id(self):
|
||||
if self._java_resource_profile is not None:
|
||||
return self._java_resource_profile.id()
|
||||
else:
|
||||
raise RuntimeError("SparkContext must be created to get the id, get the id "
|
||||
"after adding the ResourceProfile to an RDD")
|
||||
|
||||
@property
|
||||
def taskResources(self):
|
||||
if self._java_resource_profile is not None:
|
||||
taskRes = self._java_resource_profile.taskResourcesJMap()
|
||||
result = {}
|
||||
for k, v in taskRes.items():
|
||||
result[k] = TaskResourceRequest(v.resourceName(), v.amount())
|
||||
return result
|
||||
else:
|
||||
return self._task_resource_requests
|
||||
|
||||
@property
|
||||
def executorResources(self):
|
||||
if self._java_resource_profile is not None:
|
||||
execRes = self._java_resource_profile.executorResourcesJMap()
|
||||
result = {}
|
||||
for k, v in execRes.items():
|
||||
result[k] = ExecutorResourceRequest(v.resourceName(), v.amount(),
|
||||
v.discoveryScript(), v.vendor())
|
||||
return result
|
||||
else:
|
||||
return self._executor_resource_requests
|
||||
|
||||
|
||||
class ResourceProfileBuilder(object):
|
|
@ -15,7 +15,6 @@
|
|||
# limitations under the License.
|
||||
#
|
||||
|
||||
from pyspark.resource.taskrequests import TaskResourceRequest
|
||||
from pyspark.util import _parse_memory
|
||||
|
||||
|
||||
|
@ -167,3 +166,89 @@ class ExecutorResourceRequests(object):
|
|||
return result
|
||||
else:
|
||||
return self._executor_resources
|
||||
|
||||
|
||||
class TaskResourceRequest(object):
|
||||
"""
|
||||
.. note:: Evolving
|
||||
|
||||
A task resource request. This is used in conjuntion with the
|
||||
:class:`pyspark.resource.ResourceProfile` to programmatically specify the resources
|
||||
needed for an RDD that will be applied at the stage level. The amount is specified
|
||||
as a Double to allow for saying you want more than 1 task per resource. Valid values
|
||||
are less than or equal to 0.5 or whole numbers.
|
||||
Use :class:`pyspark.resource.TaskResourceRequests` class as a convenience API.
|
||||
|
||||
:param resourceName: Name of the resource
|
||||
:param amount: Amount requesting as a Double to support fractional resource requests.
|
||||
Valid values are less than or equal to 0.5 or whole numbers.
|
||||
|
||||
.. versionadded:: 3.1.0
|
||||
"""
|
||||
def __init__(self, resourceName, amount):
|
||||
self._name = resourceName
|
||||
self._amount = float(amount)
|
||||
|
||||
@property
|
||||
def resourceName(self):
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def amount(self):
|
||||
return self._amount
|
||||
|
||||
|
||||
class TaskResourceRequests(object):
|
||||
|
||||
"""
|
||||
.. note:: Evolving
|
||||
|
||||
A set of task resource requests. This is used in conjuntion with the
|
||||
:class:`pyspark.resource.ResourceProfileBuilder` to programmatically specify the resources
|
||||
needed for an RDD that will be applied at the stage level.
|
||||
|
||||
.. versionadded:: 3.1.0
|
||||
"""
|
||||
|
||||
_CPUS = "cpus"
|
||||
|
||||
def __init__(self, _jvm=None, _requests=None):
|
||||
from pyspark import SparkContext
|
||||
_jvm = _jvm or SparkContext._jvm
|
||||
if _jvm is not None:
|
||||
self._java_task_resource_requests = \
|
||||
SparkContext._jvm.org.apache.spark.resource.TaskResourceRequests()
|
||||
if _requests is not None:
|
||||
for k, v in _requests.items():
|
||||
if k == self._CPUS:
|
||||
self._java_task_resource_requests.cpus(int(v.amount))
|
||||
else:
|
||||
self._java_task_resource_requests.resource(v.resourceName, v.amount)
|
||||
else:
|
||||
self._java_task_resource_requests = None
|
||||
self._task_resources = {}
|
||||
|
||||
def cpus(self, amount):
|
||||
if self._java_task_resource_requests is not None:
|
||||
self._java_task_resource_requests.cpus(amount)
|
||||
else:
|
||||
self._task_resources[self._CPUS] = TaskResourceRequest(self._CPUS, amount)
|
||||
return self
|
||||
|
||||
def resource(self, resourceName, amount):
|
||||
if self._java_task_resource_requests is not None:
|
||||
self._java_task_resource_requests.resource(resourceName, float(amount))
|
||||
else:
|
||||
self._task_resources[resourceName] = TaskResourceRequest(resourceName, amount)
|
||||
return self
|
||||
|
||||
@property
|
||||
def requests(self):
|
||||
if self._java_task_resource_requests is not None:
|
||||
result = {}
|
||||
taskRes = self._java_task_resource_requests.requestsJMap()
|
||||
for k, v in taskRes.items():
|
||||
result[k] = TaskResourceRequest(v.resourceName(), v.amount())
|
||||
return result
|
||||
else:
|
||||
return self._task_resources
|
|
@ -1,72 +0,0 @@
|
|||
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
from pyspark.resource.taskrequests import TaskResourceRequest
|
||||
from pyspark.resource.executorrequests import ExecutorResourceRequest
|
||||
|
||||
|
||||
class ResourceProfile(object):
|
||||
|
||||
"""
|
||||
.. note:: Evolving
|
||||
|
||||
Resource profile to associate with an RDD. A :class:`pyspark.resource.ResourceProfile`
|
||||
allows the user to specify executor and task requirements for an RDD that will get
|
||||
applied during a stage. This allows the user to change the resource requirements between
|
||||
stages. This is meant to be immutable so user doesn't change it after building.
|
||||
|
||||
.. versionadded:: 3.1.0
|
||||
"""
|
||||
|
||||
def __init__(self, _java_resource_profile=None, _exec_req={}, _task_req={}):
|
||||
if _java_resource_profile is not None:
|
||||
self._java_resource_profile = _java_resource_profile
|
||||
else:
|
||||
self._java_resource_profile = None
|
||||
self._executor_resource_requests = _exec_req
|
||||
self._task_resource_requests = _task_req
|
||||
|
||||
@property
|
||||
def id(self):
|
||||
if self._java_resource_profile is not None:
|
||||
return self._java_resource_profile.id()
|
||||
else:
|
||||
raise RuntimeError("SparkContext must be created to get the id, get the id "
|
||||
"after adding the ResourceProfile to an RDD")
|
||||
|
||||
@property
|
||||
def taskResources(self):
|
||||
if self._java_resource_profile is not None:
|
||||
taskRes = self._java_resource_profile.taskResourcesJMap()
|
||||
result = {}
|
||||
for k, v in taskRes.items():
|
||||
result[k] = TaskResourceRequest(v.resourceName(), v.amount())
|
||||
return result
|
||||
else:
|
||||
return self._task_resource_requests
|
||||
|
||||
@property
|
||||
def executorResources(self):
|
||||
if self._java_resource_profile is not None:
|
||||
execRes = self._java_resource_profile.executorResourcesJMap()
|
||||
result = {}
|
||||
for k, v in execRes.items():
|
||||
result[k] = ExecutorResourceRequest(v.resourceName(), v.amount(),
|
||||
v.discoveryScript(), v.vendor())
|
||||
return result
|
||||
else:
|
||||
return self._executor_resource_requests
|
|
@ -1,102 +0,0 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
|
||||
class TaskResourceRequest(object):
|
||||
"""
|
||||
.. note:: Evolving
|
||||
|
||||
A task resource request. This is used in conjuntion with the
|
||||
:class:`pyspark.resource.ResourceProfile` to programmatically specify the resources
|
||||
needed for an RDD that will be applied at the stage level. The amount is specified
|
||||
as a Double to allow for saying you want more then 1 task per resource. Valid values
|
||||
are less than or equal to 0.5 or whole numbers.
|
||||
Use :class:`pyspark.resource.TaskResourceRequests` class as a convenience API.
|
||||
|
||||
:param resourceName: Name of the resource
|
||||
:param amount: Amount requesting as a Double to support fractional resource requests.
|
||||
Valid values are less than or equal to 0.5 or whole numbers.
|
||||
|
||||
.. versionadded:: 3.1.0
|
||||
"""
|
||||
def __init__(self, resourceName, amount):
|
||||
self._name = resourceName
|
||||
self._amount = float(amount)
|
||||
|
||||
@property
|
||||
def resourceName(self):
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def amount(self):
|
||||
return self._amount
|
||||
|
||||
|
||||
class TaskResourceRequests(object):
|
||||
|
||||
"""
|
||||
.. note:: Evolving
|
||||
|
||||
A set of task resource requests. This is used in conjuntion with the
|
||||
:class:`pyspark.resource.ResourceProfileBuilder` to programmatically specify the resources
|
||||
needed for an RDD that will be applied at the stage level.
|
||||
|
||||
.. versionadded:: 3.1.0
|
||||
"""
|
||||
|
||||
_CPUS = "cpus"
|
||||
|
||||
def __init__(self, _jvm=None, _requests=None):
|
||||
from pyspark import SparkContext
|
||||
_jvm = _jvm or SparkContext._jvm
|
||||
if _jvm is not None:
|
||||
self._java_task_resource_requests = \
|
||||
SparkContext._jvm.org.apache.spark.resource.TaskResourceRequests()
|
||||
if _requests is not None:
|
||||
for k, v in _requests.items():
|
||||
if k == self._CPUS:
|
||||
self._java_task_resource_requests.cpus(int(v.amount))
|
||||
else:
|
||||
self._java_task_resource_requests.resource(v.resourceName, v.amount)
|
||||
else:
|
||||
self._java_task_resource_requests = None
|
||||
self._task_resources = {}
|
||||
|
||||
def cpus(self, amount):
|
||||
if self._java_task_resource_requests is not None:
|
||||
self._java_task_resource_requests.cpus(amount)
|
||||
else:
|
||||
self._task_resources[self._CPUS] = TaskResourceRequest(self._CPUS, amount)
|
||||
return self
|
||||
|
||||
def resource(self, resourceName, amount):
|
||||
if self._java_task_resource_requests is not None:
|
||||
self._java_task_resource_requests.resource(resourceName, float(amount))
|
||||
else:
|
||||
self._task_resources[resourceName] = TaskResourceRequest(resourceName, amount)
|
||||
return self
|
||||
|
||||
@property
|
||||
def requests(self):
|
||||
if self._java_task_resource_requests is not None:
|
||||
result = {}
|
||||
taskRes = self._java_task_resource_requests.requestsJMap()
|
||||
for k, v in taskRes.items():
|
||||
result[k] = TaskResourceRequest(v.resourceName(), v.amount())
|
||||
return result
|
||||
else:
|
||||
return self._task_resources
|
|
@ -36,7 +36,7 @@ from pyspark.broadcast import Broadcast, _broadcastRegistry
|
|||
from pyspark.java_gateway import local_connect_and_auth
|
||||
from pyspark.taskcontext import BarrierTaskContext, TaskContext
|
||||
from pyspark.files import SparkFiles
|
||||
from pyspark.resourceinformation import ResourceInformation
|
||||
from pyspark.resource import ResourceInformation
|
||||
from pyspark.rdd import PythonEvalType
|
||||
from pyspark.serializers import write_with_length, write_int, read_long, read_bool, \
|
||||
write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, \
|
||||
|
|
Loading…
Reference in a new issue