[SPARK-29641][PYTHON][CORE] Stage Level Sched: Add python api's and tests
### What changes were proposed in this pull request? As part of the Stage level scheduling features, add the Python api's to set resource profiles. This also adds the functionality to properly apply the pyspark memory configuration when specified in the ResourceProfile. The pyspark memory configuration is being passed in the task local properties. This was an easy way to get it to the PythonRunner that needs it. I modeled this off how the barrier task scheduling is passing the addresses. As part of this I added in the JavaRDD api's because those are needed by python. ### Why are the changes needed? python api for this feature ### Does this PR introduce any user-facing change? Yes adds the java and python apis for user to specify a ResourceProfile to use stage level scheduling. ### How was this patch tested? unit tests and manually tested on yarn. Tests also run to verify it errors properly on standalone and local mode where its not yet supported. Closes #28085 from tgravescs/SPARK-29641-pr-base. Lead-authored-by: Thomas Graves <tgraves@nvidia.com> Co-authored-by: Thomas Graves <tgraves@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
parent
c619990c1d
commit
95aec091e4
|
@ -27,6 +27,7 @@ import org.apache.spark._
|
|||
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
|
||||
import org.apache.spark.api.java.function.{Function => JFunction}
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.resource.ResourceProfile
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
|
@ -49,6 +50,20 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
|
|||
*/
|
||||
def persist(newLevel: StorageLevel): JavaRDD[T] = wrapRDD(rdd.persist(newLevel))
|
||||
|
||||
/**
|
||||
* Specify a ResourceProfile to use when calculating this RDD. This is only supported on
|
||||
* certain cluster managers and currently requires dynamic allocation to be enabled.
|
||||
* It will result in new executors with the resources specified being acquired to
|
||||
* calculate the RDD.
|
||||
*/
|
||||
def withResources(rp: ResourceProfile): JavaRDD[T] = wrapRDD(rdd.withResources(rp))
|
||||
|
||||
/**
|
||||
* Get the ResourceProfile specified with this RDD or None if it wasn't specified.
|
||||
* @return the user specified ResourceProfile or null if none was specified
|
||||
*/
|
||||
def getResourceProfile(): ResourceProfile = rdd.getResourceProfile()
|
||||
|
||||
/**
|
||||
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
|
||||
* This method blocks until all blocks are deleted.
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.spark._
|
|||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES}
|
||||
import org.apache.spark.internal.config.Python._
|
||||
import org.apache.spark.resource.ResourceProfile.{EXECUTOR_CORES_LOCAL_PROPERTY, PYSPARK_MEMORY_LOCAL_PROPERTY}
|
||||
import org.apache.spark.security.SocketAuthHelper
|
||||
import org.apache.spark.util._
|
||||
|
||||
|
@ -85,9 +86,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
|
|||
private val conf = SparkEnv.get.conf
|
||||
protected val bufferSize: Int = conf.get(BUFFER_SIZE)
|
||||
private val reuseWorker = conf.get(PYTHON_WORKER_REUSE)
|
||||
// each python worker gets an equal part of the allocation. the worker pool will grow to the
|
||||
// number of concurrent tasks, which is determined by the number of cores in this executor.
|
||||
private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY).map(_ / conf.get(EXECUTOR_CORES))
|
||||
|
||||
// All the Python functions should have the same exec, version and envvars.
|
||||
protected val envVars: java.util.Map[String, String] = funcs.head.funcs.head.envVars
|
||||
|
@ -106,26 +104,41 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
|
|||
// Authentication helper used when serving method calls via socket from Python side.
|
||||
private lazy val authHelper = new SocketAuthHelper(conf)
|
||||
|
||||
// each python worker gets an equal part of the allocation. the worker pool will grow to the
|
||||
// number of concurrent tasks, which is determined by the number of cores in this executor.
|
||||
private def getWorkerMemoryMb(mem: Option[Long], cores: Int): Option[Long] = {
|
||||
mem.map(_ / cores)
|
||||
}
|
||||
|
||||
def compute(
|
||||
inputIterator: Iterator[IN],
|
||||
partitionIndex: Int,
|
||||
context: TaskContext): Iterator[OUT] = {
|
||||
val startTime = System.currentTimeMillis
|
||||
val env = SparkEnv.get
|
||||
|
||||
// Get the executor cores and pyspark memory, they are passed via the local properties when
|
||||
// the user specified them in a ResourceProfile.
|
||||
val execCoresProp = Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY))
|
||||
val memoryMb = Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong)
|
||||
val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",")
|
||||
// if OMP_NUM_THREADS is not explicitly set, override it with the number of cores
|
||||
if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
|
||||
// SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor
|
||||
// this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool
|
||||
// see https://github.com/numpy/numpy/issues/10455
|
||||
conf.getOption("spark.executor.cores").foreach(envVars.put("OMP_NUM_THREADS", _))
|
||||
execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _))
|
||||
}
|
||||
envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor thread
|
||||
if (reuseWorker) {
|
||||
envVars.put("SPARK_REUSE_WORKER", "1")
|
||||
}
|
||||
if (memoryMb.isDefined) {
|
||||
envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", memoryMb.get.toString)
|
||||
// SPARK-30299 this could be wrong with standalone mode when executor
|
||||
// cores might not be correct because it defaults to all cores on the box.
|
||||
val execCores = execCoresProp.map(_.toInt).getOrElse(conf.get(EXECUTOR_CORES))
|
||||
val workerMemoryMb = getWorkerMemoryMb(memoryMb, execCores)
|
||||
if (workerMemoryMb.isDefined) {
|
||||
envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", workerMemoryMb.get.toString)
|
||||
}
|
||||
envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
|
||||
val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.spark.resource
|
||||
|
||||
import java.util.{Map => JMap}
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
@ -38,6 +39,8 @@ private[spark] class ExecutorResourceRequests() extends Serializable {
|
|||
|
||||
def requests: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap
|
||||
|
||||
def requestsJMap: JMap[String, ExecutorResourceRequest] = requests.asJava
|
||||
|
||||
/**
|
||||
* Specify heap memory. The value specified will be converted to MiB.
|
||||
*
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.spark.annotation.Evolving
|
|||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
/**
|
||||
* Resource profile to associate with an RDD. A ResourceProfile allows the user to
|
||||
|
@ -76,6 +75,10 @@ class ResourceProfile(
|
|||
taskResources.get(ResourceProfile.CPUS).map(_.amount.toInt)
|
||||
}
|
||||
|
||||
private[spark] def getPySparkMemory: Option[Long] = {
|
||||
executorResources.get(ResourceProfile.PYSPARK_MEM).map(_.amount.toLong)
|
||||
}
|
||||
|
||||
/*
|
||||
* This function takes into account fractional amounts for the task resource requirement.
|
||||
* Spark only supports fractional amounts < 1 to basically allow for multiple tasks
|
||||
|
@ -330,4 +333,7 @@ object ResourceProfile extends Logging {
|
|||
private[spark] def getTaskCpusOrDefaultForProfile(rp: ResourceProfile, conf: SparkConf): Int = {
|
||||
rp.getTaskCpus.getOrElse(conf.get(CPUS_PER_TASK))
|
||||
}
|
||||
|
||||
private[spark] val PYSPARK_MEMORY_LOCAL_PROPERTY = "resource.pyspark.memory"
|
||||
private[spark] val EXECUTOR_CORES_LOCAL_PROPERTY = "resource.executor.cores"
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.spark.resource
|
||||
|
||||
import java.util.{Map => JMap}
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
@ -37,6 +38,8 @@ private[spark] class TaskResourceRequests() extends Serializable {
|
|||
|
||||
def requests: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap
|
||||
|
||||
def requestsJMap: JMap[String, TaskResourceRequest] = requests.asJava
|
||||
|
||||
/**
|
||||
* Specify number of cpus per Task.
|
||||
*
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.spark.network.util.JavaUtils
|
|||
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
|
||||
import org.apache.spark.rdd.{RDD, RDDCheckpointData}
|
||||
import org.apache.spark.resource.ResourceProfile
|
||||
import org.apache.spark.resource.ResourceProfile.{DEFAULT_RESOURCE_PROFILE_ID, EXECUTOR_CORES_LOCAL_PROPERTY, PYSPARK_MEMORY_LOCAL_PROPERTY}
|
||||
import org.apache.spark.rpc.RpcTimeout
|
||||
import org.apache.spark.storage._
|
||||
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
|
||||
|
@ -1175,6 +1176,27 @@ private[spark] class DAGScheduler(
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* `PythonRunner` needs to know what the pyspark memory and cores settings are for the profile
|
||||
* being run. Pass them in the local properties of the task if it's set for the stage profile.
|
||||
*/
|
||||
private def addPySparkConfigsToProperties(stage: Stage, properties: Properties): Unit = {
|
||||
val rp = sc.resourceProfileManager.resourceProfileFromId(stage.resourceProfileId)
|
||||
val pysparkMem = rp.getPySparkMemory
|
||||
// use the getOption on EXECUTOR_CORES.key instead of using the EXECUTOR_CORES config reader
|
||||
// because the default for this config isn't correct for standalone mode. Here we want
|
||||
// to know if it was explicitly set or not. The default profile always has it set to either
|
||||
// what user specified or default so special case it here.
|
||||
val execCores = if (rp.id == DEFAULT_RESOURCE_PROFILE_ID) {
|
||||
sc.conf.getOption(config.EXECUTOR_CORES.key)
|
||||
} else {
|
||||
val profCores = rp.getExecutorCores.map(_.toString)
|
||||
if (profCores.isEmpty) sc.conf.getOption(config.EXECUTOR_CORES.key) else profCores
|
||||
}
|
||||
pysparkMem.map(mem => properties.setProperty(PYSPARK_MEMORY_LOCAL_PROPERTY, mem.toString))
|
||||
execCores.map(cores => properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
|
||||
}
|
||||
|
||||
/** Called when stage's parents are available and we can now do its task. */
|
||||
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
|
||||
logDebug("submitMissingTasks(" + stage + ")")
|
||||
|
@ -1194,6 +1216,7 @@ private[spark] class DAGScheduler(
|
|||
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
|
||||
// with this Stage
|
||||
val properties = jobIdToActiveJob(jobId).properties
|
||||
addPySparkConfigsToProperties(stage, properties)
|
||||
|
||||
runningStages += stage
|
||||
// SparkListenerStageSubmitted should be posted before testing whether tasks are
|
||||
|
|
|
@ -73,6 +73,10 @@ import org.apache.spark.input.PortableDataStream;
|
|||
import org.apache.spark.partial.BoundedDouble;
|
||||
import org.apache.spark.partial.PartialResult;
|
||||
import org.apache.spark.rdd.RDD;
|
||||
import org.apache.spark.resource.ExecutorResourceRequests;
|
||||
import org.apache.spark.resource.ResourceProfile;
|
||||
import org.apache.spark.resource.ResourceProfileBuilder;
|
||||
import org.apache.spark.resource.TaskResourceRequests;
|
||||
import org.apache.spark.serializer.KryoSerializer;
|
||||
import org.apache.spark.storage.StorageLevel;
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
|
@ -897,6 +901,16 @@ public class JavaAPISuite implements Serializable {
|
|||
assertEquals(1, rdd.first().intValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void withResources() {
|
||||
ExecutorResourceRequests ereqs = new ExecutorResourceRequests().cores(4);
|
||||
TaskResourceRequests treqs = new TaskResourceRequests().cpus(1);
|
||||
ResourceProfile rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build();
|
||||
JavaRDD<Integer> in1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
|
||||
in1.withResources(rp1);
|
||||
assertEquals(rp1, in1.getResourceProfile());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void iterator() {
|
||||
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
|
||||
|
|
|
@ -398,6 +398,21 @@ pyspark_sql = Module(
|
|||
)
|
||||
|
||||
|
||||
pyspark_resource = Module(
|
||||
name="pyspark-resource",
|
||||
dependencies=[
|
||||
pyspark_core
|
||||
],
|
||||
source_file_regexes=[
|
||||
"python/pyspark/resource"
|
||||
],
|
||||
python_test_goals=[
|
||||
# unittests
|
||||
"pyspark.resource.tests.test_resources",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
pyspark_streaming = Module(
|
||||
name="pyspark-streaming",
|
||||
dependencies=[
|
||||
|
|
|
@ -152,6 +152,7 @@ def launch_gateway(conf=None, popen_kwargs=None):
|
|||
java_import(gateway.jvm, "org.apache.spark.api.python.*")
|
||||
java_import(gateway.jvm, "org.apache.spark.ml.python.*")
|
||||
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
|
||||
java_import(gateway.jvm, "org.apache.spark.resource.*")
|
||||
# TODO(davies): move into sql
|
||||
java_import(gateway.jvm, "org.apache.spark.sql.*")
|
||||
java_import(gateway.jvm, "org.apache.spark.sql.api.python.*")
|
||||
|
|
|
@ -47,11 +47,14 @@ from pyspark.join import python_join, python_left_outer_join, \
|
|||
from pyspark.statcounter import StatCounter
|
||||
from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler
|
||||
from pyspark.storagelevel import StorageLevel
|
||||
from pyspark.resource.executorrequests import ExecutorResourceRequests
|
||||
from pyspark.resource.resourceprofile import ResourceProfile
|
||||
from pyspark.resource.taskrequests import TaskResourceRequests
|
||||
from pyspark.resultiterable import ResultIterable
|
||||
from pyspark.shuffle import Aggregator, ExternalMerger, \
|
||||
get_used_memory, ExternalSorter, ExternalGroupBy
|
||||
from pyspark.traceback_utils import SCCallSiteSync
|
||||
from pyspark.util import fail_on_stopiteration
|
||||
from pyspark.util import fail_on_stopiteration, _parse_memory
|
||||
|
||||
|
||||
__all__ = ["RDD"]
|
||||
|
@ -125,22 +128,6 @@ class BoundedFloat(float):
|
|||
return obj
|
||||
|
||||
|
||||
def _parse_memory(s):
|
||||
"""
|
||||
Parse a memory string in the format supported by Java (e.g. 1g, 200m) and
|
||||
return the value in MiB
|
||||
|
||||
>>> _parse_memory("256m")
|
||||
256
|
||||
>>> _parse_memory("2g")
|
||||
2048
|
||||
"""
|
||||
units = {'g': 1024, 'm': 1, 't': 1 << 20, 'k': 1.0 / 1024}
|
||||
if s[-1].lower() not in units:
|
||||
raise ValueError("invalid format: " + s)
|
||||
return int(float(s[:-1]) * units[s[-1].lower()])
|
||||
|
||||
|
||||
def _create_local_socket(sock_info):
|
||||
"""
|
||||
Create a local socket that can be used to load deserialized data from the JVM
|
||||
|
@ -256,6 +243,7 @@ class RDD(object):
|
|||
self._jrdd = jrdd
|
||||
self.is_cached = False
|
||||
self.is_checkpointed = False
|
||||
self.has_resource_profile = False
|
||||
self.ctx = ctx
|
||||
self._jrdd_deserializer = jrdd_deserializer
|
||||
self._id = jrdd.id()
|
||||
|
@ -2483,6 +2471,47 @@ class RDD(object):
|
|||
"""
|
||||
return self._jrdd.rdd().isBarrier()
|
||||
|
||||
def withResources(self, profile):
|
||||
"""
|
||||
.. note:: Experimental
|
||||
|
||||
Specify a :class:`pyspark.resource.ResourceProfile` to use when calculating this RDD.
|
||||
This is only supported on certain cluster managers and currently requires dynamic
|
||||
allocation to be enabled. It will result in new executors with the resources specified
|
||||
being acquired to calculate the RDD.
|
||||
|
||||
.. versionadded:: 3.1.0
|
||||
"""
|
||||
self.has_resource_profile = True
|
||||
if profile._java_resource_profile is not None:
|
||||
jrp = profile._java_resource_profile
|
||||
else:
|
||||
builder = self.ctx._jvm.org.apache.spark.resource.ResourceProfileBuilder()
|
||||
ereqs = ExecutorResourceRequests(self.ctx._jvm, profile._executor_resource_requests)
|
||||
treqs = TaskResourceRequests(self.ctx._jvm, profile._task_resource_requests)
|
||||
builder.require(ereqs._java_executor_resource_requests)
|
||||
builder.require(treqs._java_task_resource_requests)
|
||||
jrp = builder.build()
|
||||
|
||||
self._jrdd.withResources(jrp)
|
||||
return self
|
||||
|
||||
def getResourceProfile(self):
|
||||
"""
|
||||
.. note:: Experimental
|
||||
|
||||
Get the :class:`pyspark.resource.ResourceProfile` specified with this RDD or None
|
||||
if it wasn't specified.
|
||||
:return: the user specified ResourceProfile or None if none were specified
|
||||
|
||||
.. versionadded:: 3.1.0
|
||||
"""
|
||||
rp = self._jrdd.getResourceProfile()
|
||||
if rp is not None:
|
||||
return ResourceProfile(_java_resource_profile=rp)
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def _prepare_for_python_RDD(sc, command):
|
||||
# the serialized command will be compressed by broadcast
|
||||
|
@ -2587,6 +2616,7 @@ class PipelinedRDD(RDD):
|
|||
self._prev_jrdd = prev._prev_jrdd # maintain the pipeline
|
||||
self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
|
||||
self.is_cached = False
|
||||
self.has_resource_profile = False
|
||||
self.is_checkpointed = False
|
||||
self.ctx = prev.ctx
|
||||
self.prev = prev
|
||||
|
@ -2629,7 +2659,7 @@ class PipelinedRDD(RDD):
|
|||
return self._id
|
||||
|
||||
def _is_pipelinable(self):
|
||||
return not (self.is_cached or self.is_checkpointed)
|
||||
return not (self.is_cached or self.is_checkpointed or self.has_resource_profile)
|
||||
|
||||
def _is_barrier(self):
|
||||
return self.is_barrier
|
||||
|
|
29
python/pyspark/resource/__init__.py
Normal file
29
python/pyspark/resource/__init__.py
Normal file
|
@ -0,0 +1,29 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
"""
|
||||
APIs to let users manipulate resource requirements.
|
||||
"""
|
||||
from pyspark.resource.executorrequests import ExecutorResourceRequest, ExecutorResourceRequests
|
||||
from pyspark.resource.taskrequests import TaskResourceRequest, TaskResourceRequests
|
||||
from pyspark.resource.resourceprofilebuilder import ResourceProfileBuilder
|
||||
from pyspark.resource.resourceprofile import ResourceProfile
|
||||
|
||||
__all__ = [
|
||||
"TaskResourceRequest", "TaskResourceRequests", "ExecutorResourceRequest",
|
||||
"ExecutorResourceRequests", "ResourceProfile", "ResourceProfileBuilder",
|
||||
]
|
169
python/pyspark/resource/executorrequests.py
Normal file
169
python/pyspark/resource/executorrequests.py
Normal file
|
@ -0,0 +1,169 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
from pyspark.resource.taskrequests import TaskResourceRequest
|
||||
from pyspark.util import _parse_memory
|
||||
|
||||
|
||||
class ExecutorResourceRequest(object):
|
||||
"""
|
||||
.. note:: Evolving
|
||||
|
||||
An Executor resource request. This is used in conjunction with the ResourceProfile to
|
||||
programmatically specify the resources needed for an RDD that will be applied at the
|
||||
stage level.
|
||||
|
||||
This is used to specify what the resource requirements are for an Executor and how
|
||||
Spark can find out specific details about those resources. Not all the parameters are
|
||||
required for every resource type. Resources like GPUs are supported and have same limitations
|
||||
as using the global spark configs spark.executor.resource.gpu.*. The amount, discoveryScript,
|
||||
and vendor parameters for resources are all the same parameters a user would specify through the
|
||||
configs: spark.executor.resource.{resourceName}.{amount, discoveryScript, vendor}.
|
||||
|
||||
For instance, a user wants to allocate an Executor with GPU resources on YARN. The user has
|
||||
to specify the resource name (gpu), the amount or number of GPUs per Executor,
|
||||
the discovery script would be specified so that when the Executor starts up it can
|
||||
discovery what GPU addresses are available for it to use because YARN doesn't tell
|
||||
Spark that, then vendor would not be used because its specific for Kubernetes.
|
||||
|
||||
See the configuration and cluster specific docs for more details.
|
||||
|
||||
Use `pyspark.ExecutorResourceRequests` class as a convenience API.
|
||||
|
||||
:param resourceName: Name of the resource
|
||||
:param amount: Amount requesting
|
||||
:param discoveryScript: Optional script used to discover the resources. This is required on some
|
||||
cluster managers that don't tell Spark the addresses of the resources
|
||||
allocated. The script runs on Executors startup to discover the addresses
|
||||
of the resources available.
|
||||
:param vendor: Vendor, required for some cluster managers
|
||||
|
||||
.. versionadded:: 3.1.0
|
||||
"""
|
||||
def __init__(self, resourceName, amount, discoveryScript="", vendor=""):
|
||||
self._name = resourceName
|
||||
self._amount = amount
|
||||
self._discovery_script = discoveryScript
|
||||
self._vendor = vendor
|
||||
|
||||
@property
|
||||
def resourceName(self):
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def amount(self):
|
||||
return self._amount
|
||||
|
||||
@property
|
||||
def discoveryScript(self):
|
||||
return self._discovery_script
|
||||
|
||||
@property
|
||||
def vendor(self):
|
||||
return self._vendor
|
||||
|
||||
|
||||
class ExecutorResourceRequests(object):
|
||||
|
||||
"""
|
||||
.. note:: Evolving
|
||||
|
||||
A set of Executor resource requests. This is used in conjunction with the
|
||||
:class:`pyspark.resource.ResourceProfileBuilder` to programmatically specify the
|
||||
resources needed for an RDD that will be applied at the stage level.
|
||||
|
||||
.. versionadded:: 3.1.0
|
||||
"""
|
||||
_CORES = "cores"
|
||||
_MEMORY = "memory"
|
||||
_OVERHEAD_MEM = "memoryOverhead"
|
||||
_PYSPARK_MEM = "pyspark.memory"
|
||||
|
||||
def __init__(self, _jvm=None, _requests=None):
|
||||
from pyspark import SparkContext
|
||||
_jvm = _jvm or SparkContext._jvm
|
||||
if _jvm is not None:
|
||||
self._java_executor_resource_requests = \
|
||||
_jvm.org.apache.spark.resource.ExecutorResourceRequests()
|
||||
if _requests is not None:
|
||||
for k, v in _requests.items():
|
||||
if k == self._MEMORY:
|
||||
self._java_executor_resource_requests.memory(str(v.amount))
|
||||
elif k == self._OVERHEAD_MEM:
|
||||
self._java_executor_resource_requests.memoryOverhead(str(v.amount))
|
||||
elif k == self._PYSPARK_MEM:
|
||||
self._java_executor_resource_requests.pysparkMemory(str(v.amount))
|
||||
elif k == self._CORES:
|
||||
self._java_executor_resource_requests.cores(v.amount)
|
||||
else:
|
||||
self._java_executor_resource_requests.resource(v.resourceName, v.amount,
|
||||
v.discoveryScript, v.vendor)
|
||||
else:
|
||||
self._java_executor_resource_requests = None
|
||||
self._executor_resources = {}
|
||||
|
||||
def memory(self, amount):
|
||||
if self._java_executor_resource_requests is not None:
|
||||
self._java_executor_resource_requests.memory(amount)
|
||||
else:
|
||||
self._executor_resources[self._MEMORY] = ExecutorResourceRequest(self._MEMORY,
|
||||
_parse_memory(amount))
|
||||
return self
|
||||
|
||||
def memoryOverhead(self, amount):
|
||||
if self._java_executor_resource_requests is not None:
|
||||
self._java_executor_resource_requests.memoryOverhead(amount)
|
||||
else:
|
||||
self._executor_resources[self._OVERHEAD_MEM] = \
|
||||
ExecutorResourceRequest(self._OVERHEAD_MEM, _parse_memory(amount))
|
||||
return self
|
||||
|
||||
def pysparkMemory(self, amount):
|
||||
if self._java_executor_resource_requests is not None:
|
||||
self._java_executor_resource_requests.pysparkMemory(amount)
|
||||
else:
|
||||
self._executor_resources[self._PYSPARK_MEM] = \
|
||||
ExecutorResourceRequest(self._PYSPARK_MEM, _parse_memory(amount))
|
||||
return self
|
||||
|
||||
def cores(self, amount):
|
||||
if self._java_executor_resource_requests is not None:
|
||||
self._java_executor_resource_requests.cores(amount)
|
||||
else:
|
||||
self._executor_resources[self._CORES] = ExecutorResourceRequest(self._CORES, amount)
|
||||
return self
|
||||
|
||||
def resource(self, resourceName, amount, discoveryScript="", vendor=""):
|
||||
if self._java_executor_resource_requests is not None:
|
||||
self._java_executor_resource_requests.resource(resourceName, amount, discoveryScript,
|
||||
vendor)
|
||||
else:
|
||||
self._executor_resources[resourceName] = \
|
||||
ExecutorResourceRequest(resourceName, amount, discoveryScript, vendor)
|
||||
return self
|
||||
|
||||
@property
|
||||
def requests(self):
|
||||
if self._java_executor_resource_requests is not None:
|
||||
result = {}
|
||||
execRes = self._java_executor_resource_requests.requestsJMap()
|
||||
for k, v in execRes.items():
|
||||
result[k] = ExecutorResourceRequest(v.resourceName(), v.amount(),
|
||||
v.discoveryScript(), v.vendor())
|
||||
return result
|
||||
else:
|
||||
return self._executor_resources
|
72
python/pyspark/resource/resourceprofile.py
Normal file
72
python/pyspark/resource/resourceprofile.py
Normal file
|
@ -0,0 +1,72 @@
|
|||
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
from pyspark.resource.taskrequests import TaskResourceRequest
|
||||
from pyspark.resource.executorrequests import ExecutorResourceRequest
|
||||
|
||||
|
||||
class ResourceProfile(object):
|
||||
|
||||
"""
|
||||
.. note:: Evolving
|
||||
|
||||
Resource profile to associate with an RDD. A :class:`pyspark.resource.ResourceProfile`
|
||||
allows the user to specify executor and task requirements for an RDD that will get
|
||||
applied during a stage. This allows the user to change the resource requirements between
|
||||
stages. This is meant to be immutable so user doesn't change it after building.
|
||||
|
||||
.. versionadded:: 3.1.0
|
||||
"""
|
||||
|
||||
def __init__(self, _java_resource_profile=None, _exec_req={}, _task_req={}):
|
||||
if _java_resource_profile is not None:
|
||||
self._java_resource_profile = _java_resource_profile
|
||||
else:
|
||||
self._java_resource_profile = None
|
||||
self._executor_resource_requests = _exec_req
|
||||
self._task_resource_requests = _task_req
|
||||
|
||||
@property
|
||||
def id(self):
|
||||
if self._java_resource_profile is not None:
|
||||
return self._java_resource_profile.id()
|
||||
else:
|
||||
raise RuntimeError("SparkContext must be created to get the id, get the id "
|
||||
"after adding the ResourceProfile to an RDD")
|
||||
|
||||
@property
|
||||
def taskResources(self):
|
||||
if self._java_resource_profile is not None:
|
||||
taskRes = self._java_resource_profile.taskResourcesJMap()
|
||||
result = {}
|
||||
for k, v in taskRes.items():
|
||||
result[k] = TaskResourceRequest(v.resourceName(), v.amount())
|
||||
return result
|
||||
else:
|
||||
return self._task_resource_requests
|
||||
|
||||
@property
|
||||
def executorResources(self):
|
||||
if self._java_resource_profile is not None:
|
||||
execRes = self._java_resource_profile.executorResourcesJMap()
|
||||
result = {}
|
||||
for k, v in execRes.items():
|
||||
result[k] = ExecutorResourceRequest(v.resourceName(), v.amount(),
|
||||
v.discoveryScript(), v.vendor())
|
||||
return result
|
||||
else:
|
||||
return self._executor_resource_requests
|
117
python/pyspark/resource/resourceprofilebuilder.py
Normal file
117
python/pyspark/resource/resourceprofilebuilder.py
Normal file
|
@ -0,0 +1,117 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
from pyspark.resource.executorrequests import ExecutorResourceRequest,\
|
||||
ExecutorResourceRequests
|
||||
from pyspark.resource.resourceprofile import ResourceProfile
|
||||
from pyspark.resource.taskrequests import TaskResourceRequest, TaskResourceRequests
|
||||
|
||||
|
||||
class ResourceProfileBuilder(object):
|
||||
|
||||
"""
|
||||
.. note:: Evolving
|
||||
|
||||
Resource profile Builder to build a resource profile to associate with an RDD.
|
||||
A ResourceProfile allows the user to specify executor and task requirements for
|
||||
an RDD that will get applied during a stage. This allows the user to change the
|
||||
resource requirements between stages.
|
||||
|
||||
.. versionadded:: 3.1.0
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
from pyspark.context import SparkContext
|
||||
_jvm = SparkContext._jvm
|
||||
if _jvm is not None:
|
||||
self._jvm = _jvm
|
||||
self._java_resource_profile_builder = \
|
||||
_jvm.org.apache.spark.resource.ResourceProfileBuilder()
|
||||
else:
|
||||
self._jvm = None
|
||||
self._java_resource_profile_builder = None
|
||||
self._executor_resource_requests = {}
|
||||
self._task_resource_requests = {}
|
||||
|
||||
def require(self, resourceRequest):
|
||||
if isinstance(resourceRequest, TaskResourceRequests):
|
||||
if self._java_resource_profile_builder is not None:
|
||||
if resourceRequest._java_task_resource_requests is not None:
|
||||
self._java_resource_profile_builder.require(
|
||||
resourceRequest._java_task_resource_requests)
|
||||
else:
|
||||
taskReqs = TaskResourceRequests(self._jvm, resourceRequest.requests)
|
||||
self._java_resource_profile_builder.require(
|
||||
taskReqs._java_task_resource_requests)
|
||||
else:
|
||||
self._task_resource_requests.update(resourceRequest.requests)
|
||||
else:
|
||||
if self._java_resource_profile_builder is not None:
|
||||
if resourceRequest._java_executor_resource_requests is not None:
|
||||
self._java_resource_profile_builder.require(
|
||||
resourceRequest._java_executor_resource_requests)
|
||||
else:
|
||||
execReqs = ExecutorResourceRequests(self._jvm, resourceRequest.requests)
|
||||
self._java_resource_profile_builder.require(
|
||||
execReqs._java_executor_resource_requests)
|
||||
else:
|
||||
self._executor_resource_requests.update(resourceRequest.requests)
|
||||
return self
|
||||
|
||||
def clearExecutorResourceRequests(self):
|
||||
if self._java_resource_profile_builder is not None:
|
||||
self._java_resource_profile_builder.clearExecutorResourceRequests()
|
||||
else:
|
||||
self._executor_resource_requests = {}
|
||||
|
||||
def clearTaskResourceRequests(self):
|
||||
if self._java_resource_profile_builder is not None:
|
||||
self._java_resource_profile_builder.clearTaskResourceRequests()
|
||||
else:
|
||||
self._task_resource_requests = {}
|
||||
|
||||
@property
|
||||
def taskResources(self):
|
||||
if self._java_resource_profile_builder is not None:
|
||||
taskRes = self._java_resource_profile_builder.taskResourcesJMap()
|
||||
result = {}
|
||||
for k, v in taskRes.items():
|
||||
result[k] = TaskResourceRequest(v.resourceName(), v.amount())
|
||||
return result
|
||||
else:
|
||||
return self._task_resource_requests
|
||||
|
||||
@property
|
||||
def executorResources(self):
|
||||
if self._java_resource_profile_builder is not None:
|
||||
result = {}
|
||||
execRes = self._java_resource_profile_builder.executorResourcesJMap()
|
||||
for k, v in execRes.items():
|
||||
result[k] = ExecutorResourceRequest(v.resourceName(), v.amount(),
|
||||
v.discoveryScript(), v.vendor())
|
||||
return result
|
||||
else:
|
||||
return self._executor_resource_requests
|
||||
|
||||
@property
|
||||
def build(self):
|
||||
if self._java_resource_profile_builder is not None:
|
||||
jresourceProfile = self._java_resource_profile_builder.build()
|
||||
return ResourceProfile(_java_resource_profile=jresourceProfile)
|
||||
else:
|
||||
return ResourceProfile(_exec_req=self._executor_resource_requests,
|
||||
_task_req=self._task_resource_requests)
|
102
python/pyspark/resource/taskrequests.py
Normal file
102
python/pyspark/resource/taskrequests.py
Normal file
|
@ -0,0 +1,102 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
|
||||
class TaskResourceRequest(object):
|
||||
"""
|
||||
.. note:: Evolving
|
||||
|
||||
A task resource request. This is used in conjuntion with the
|
||||
:class:`pyspark.resource.ResourceProfile` to programmatically specify the resources
|
||||
needed for an RDD that will be applied at the stage level. The amount is specified
|
||||
as a Double to allow for saying you want more then 1 task per resource. Valid values
|
||||
are less than or equal to 0.5 or whole numbers.
|
||||
Use :class:`pyspark.resource.TaskResourceRequests` class as a convenience API.
|
||||
|
||||
:param resourceName: Name of the resource
|
||||
:param amount: Amount requesting as a Double to support fractional resource requests.
|
||||
Valid values are less than or equal to 0.5 or whole numbers.
|
||||
|
||||
.. versionadded:: 3.1.0
|
||||
"""
|
||||
def __init__(self, resourceName, amount):
|
||||
self._name = resourceName
|
||||
self._amount = float(amount)
|
||||
|
||||
@property
|
||||
def resourceName(self):
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def amount(self):
|
||||
return self._amount
|
||||
|
||||
|
||||
class TaskResourceRequests(object):
|
||||
|
||||
"""
|
||||
.. note:: Evolving
|
||||
|
||||
A set of task resource requests. This is used in conjuntion with the
|
||||
:class:`pyspark.resource.ResourceProfileBuilder` to programmatically specify the resources
|
||||
needed for an RDD that will be applied at the stage level.
|
||||
|
||||
.. versionadded:: 3.1.0
|
||||
"""
|
||||
|
||||
_CPUS = "cpus"
|
||||
|
||||
def __init__(self, _jvm=None, _requests=None):
|
||||
from pyspark import SparkContext
|
||||
_jvm = _jvm or SparkContext._jvm
|
||||
if _jvm is not None:
|
||||
self._java_task_resource_requests = \
|
||||
SparkContext._jvm.org.apache.spark.resource.TaskResourceRequests()
|
||||
if _requests is not None:
|
||||
for k, v in _requests.items():
|
||||
if k == self._CPUS:
|
||||
self._java_task_resource_requests.cpus(int(v.amount))
|
||||
else:
|
||||
self._java_task_resource_requests.resource(v.resourceName, v.amount)
|
||||
else:
|
||||
self._java_task_resource_requests = None
|
||||
self._task_resources = {}
|
||||
|
||||
def cpus(self, amount):
|
||||
if self._java_task_resource_requests is not None:
|
||||
self._java_task_resource_requests.cpus(amount)
|
||||
else:
|
||||
self._task_resources[self._CPUS] = TaskResourceRequest(self._CPUS, amount)
|
||||
return self
|
||||
|
||||
def resource(self, resourceName, amount):
|
||||
if self._java_task_resource_requests is not None:
|
||||
self._java_task_resource_requests.resource(resourceName, float(amount))
|
||||
else:
|
||||
self._task_resources[resourceName] = TaskResourceRequest(resourceName, amount)
|
||||
return self
|
||||
|
||||
@property
|
||||
def requests(self):
|
||||
if self._java_task_resource_requests is not None:
|
||||
result = {}
|
||||
taskRes = self._java_task_resource_requests.requestsJMap()
|
||||
for k, v in taskRes.items():
|
||||
result[k] = TaskResourceRequest(v.resourceName(), v.amount())
|
||||
return result
|
||||
else:
|
||||
return self._task_resources
|
16
python/pyspark/resource/tests/__init__.py
Normal file
16
python/pyspark/resource/tests/__init__.py
Normal file
|
@ -0,0 +1,16 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
82
python/pyspark/resource/tests/test_resources.py
Normal file
82
python/pyspark/resource/tests/test_resources.py
Normal file
|
@ -0,0 +1,82 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import random
|
||||
import unittest
|
||||
|
||||
from pyspark.resource import ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder,\
|
||||
TaskResourceRequests
|
||||
|
||||
|
||||
class ResourceProfileTests(unittest.TestCase):
|
||||
def test_profile_before_sc(self):
|
||||
rpb = ResourceProfileBuilder()
|
||||
ereqs = ExecutorResourceRequests().cores(2).memory("6g").memoryOverhead("1g")
|
||||
ereqs.pysparkMemory("2g").resource("gpu", 2, "testGpus", "nvidia.com")
|
||||
treqs = TaskResourceRequests().cpus(2).resource("gpu", 2)
|
||||
|
||||
def assert_request_contents(exec_reqs, task_reqs):
|
||||
self.assertEqual(len(exec_reqs), 5)
|
||||
self.assertEqual(exec_reqs["cores"].amount, 2)
|
||||
self.assertEqual(exec_reqs["memory"].amount, 6144)
|
||||
self.assertEqual(exec_reqs["memoryOverhead"].amount, 1024)
|
||||
self.assertEqual(exec_reqs["pyspark.memory"].amount, 2048)
|
||||
self.assertEqual(exec_reqs["gpu"].amount, 2)
|
||||
self.assertEqual(exec_reqs["gpu"].discoveryScript, "testGpus")
|
||||
self.assertEqual(exec_reqs["gpu"].resourceName, "gpu")
|
||||
self.assertEqual(exec_reqs["gpu"].vendor, "nvidia.com")
|
||||
self.assertEqual(len(task_reqs), 2)
|
||||
self.assertEqual(task_reqs["cpus"].amount, 2.0)
|
||||
self.assertEqual(task_reqs["gpu"].amount, 2.0)
|
||||
|
||||
assert_request_contents(ereqs.requests, treqs.requests)
|
||||
rp = rpb.require(ereqs).require(treqs).build
|
||||
assert_request_contents(rp.executorResources, rp.taskResources)
|
||||
from pyspark import SparkContext, SparkConf
|
||||
sc = SparkContext(conf=SparkConf())
|
||||
rdd = sc.parallelize(range(10)).withResources(rp)
|
||||
return_rp = rdd.getResourceProfile()
|
||||
assert_request_contents(return_rp.executorResources, return_rp.taskResources)
|
||||
# intermix objects created before SparkContext init and after
|
||||
rpb2 = ResourceProfileBuilder()
|
||||
# use reqs created before SparkContext with Builder after
|
||||
rpb2.require(ereqs)
|
||||
rpb2.require(treqs)
|
||||
rp2 = rpb2.build
|
||||
self.assertTrue(rp2.id > 0)
|
||||
rdd2 = sc.parallelize(range(10)).withResources(rp2)
|
||||
return_rp2 = rdd2.getResourceProfile()
|
||||
assert_request_contents(return_rp2.executorResources, return_rp2.taskResources)
|
||||
ereqs2 = ExecutorResourceRequests().cores(2).memory("6g").memoryOverhead("1g")
|
||||
ereqs.pysparkMemory("2g").resource("gpu", 2, "testGpus", "nvidia.com")
|
||||
treqs2 = TaskResourceRequests().cpus(2).resource("gpu", 2)
|
||||
# use reqs created after SparkContext with Builder before
|
||||
rpb.require(ereqs2)
|
||||
rpb.require(treqs2)
|
||||
rp3 = rpb.build
|
||||
assert_request_contents(rp3.executorResources, rp3.taskResources)
|
||||
sc.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pyspark.resource.tests.test_resources import *
|
||||
|
||||
try:
|
||||
import xmlrunner
|
||||
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
|
||||
except ImportError:
|
||||
testRunner = None
|
||||
unittest.main(testRunner=testRunner, verbosity=2)
|
|
@ -26,6 +26,8 @@ from glob import glob
|
|||
from py4j.protocol import Py4JJavaError
|
||||
|
||||
from pyspark import shuffle, RDD
|
||||
from pyspark.resource import ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder,\
|
||||
TaskResourceRequests
|
||||
from pyspark.serializers import CloudPickleSerializer, BatchedSerializer, PickleSerializer,\
|
||||
MarshalSerializer, UTF8Deserializer, NoOpSerializer
|
||||
from pyspark.testing.utils import ReusedPySparkTestCase, SPARK_HOME, QuietTest
|
||||
|
@ -783,6 +785,34 @@ class RDDTests(ReusedPySparkTestCase):
|
|||
for i in range(4):
|
||||
self.assertEqual(i, next(it))
|
||||
|
||||
def test_resourceprofile(self):
|
||||
rp_builder = ResourceProfileBuilder()
|
||||
ereqs = ExecutorResourceRequests().cores(2).memory("6g").memoryOverhead("1g")
|
||||
ereqs.pysparkMemory("2g").resource("gpu", 2, "testGpus", "nvidia.com")
|
||||
treqs = TaskResourceRequests().cpus(2).resource("gpu", 2)
|
||||
|
||||
def assert_request_contents(exec_reqs, task_reqs):
|
||||
self.assertEqual(len(exec_reqs), 5)
|
||||
self.assertEqual(exec_reqs["cores"].amount, 2)
|
||||
self.assertEqual(exec_reqs["memory"].amount, 6144)
|
||||
self.assertEqual(exec_reqs["memoryOverhead"].amount, 1024)
|
||||
self.assertEqual(exec_reqs["pyspark.memory"].amount, 2048)
|
||||
self.assertEqual(exec_reqs["gpu"].amount, 2)
|
||||
self.assertEqual(exec_reqs["gpu"].discoveryScript, "testGpus")
|
||||
self.assertEqual(exec_reqs["gpu"].resourceName, "gpu")
|
||||
self.assertEqual(exec_reqs["gpu"].vendor, "nvidia.com")
|
||||
self.assertEqual(len(task_reqs), 2)
|
||||
self.assertEqual(task_reqs["cpus"].amount, 2.0)
|
||||
self.assertEqual(task_reqs["gpu"].amount, 2.0)
|
||||
|
||||
assert_request_contents(ereqs.requests, treqs.requests)
|
||||
rp = rp_builder.require(ereqs).require(treqs).build
|
||||
assert_request_contents(rp.executorResources, rp.taskResources)
|
||||
rdd = self.sc.parallelize(range(10)).withResources(rp)
|
||||
return_rp = rdd.getResourceProfile()
|
||||
assert_request_contents(return_rp.executorResources, return_rp.taskResources)
|
||||
rddWithoutRp = self.sc.parallelize(range(10))
|
||||
self.assertEqual(rddWithoutRp.getResourceProfile(), None)
|
||||
|
||||
if __name__ == "__main__":
|
||||
import unittest
|
||||
|
|
|
@ -29,6 +29,7 @@ except ImportError:
|
|||
|
||||
from py4j.protocol import Py4JJavaError
|
||||
|
||||
from pyspark import SparkConf, SparkContext
|
||||
from pyspark.testing.utils import ReusedPySparkTestCase, PySparkTestCase, QuietTest
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
|
@ -180,10 +181,14 @@ class WorkerReuseTest(PySparkTestCase):
|
|||
not has_resource_module,
|
||||
"Memory limit feature in Python worker is dependent on "
|
||||
"Python's 'resource' module; however, not found.")
|
||||
class WorkerMemoryTest(PySparkTestCase):
|
||||
class WorkerMemoryTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
class_name = self.__class__.__name__
|
||||
conf = SparkConf().set("spark.executor.pyspark.memory", "2g")
|
||||
self.sc = SparkContext('local[4]', class_name, conf=conf)
|
||||
|
||||
def test_memory_limit(self):
|
||||
self.sc._conf.set("spark.executor.pyspark.memory", "2g")
|
||||
rdd = self.sc.parallelize(xrange(1), 1)
|
||||
|
||||
def getrlimit():
|
||||
|
@ -197,6 +202,8 @@ class WorkerMemoryTest(PySparkTestCase):
|
|||
self.assertEqual(soft_limit, 2 * 1024 * 1024 * 1024)
|
||||
self.assertEqual(hard_limit, 2 * 1024 * 1024 * 1024)
|
||||
|
||||
def tearDown(self):
|
||||
self.sc.stop()
|
||||
|
||||
if __name__ == "__main__":
|
||||
import unittest
|
||||
|
|
|
@ -168,6 +168,21 @@ ________________________________________________________________________________
|
|||
})
|
||||
|
||||
|
||||
def _parse_memory(s):
|
||||
"""
|
||||
Parse a memory string in the format supported by Java (e.g. 1g, 200m) and
|
||||
return the value in MiB
|
||||
|
||||
>>> _parse_memory("256m")
|
||||
256
|
||||
>>> _parse_memory("2g")
|
||||
2048
|
||||
"""
|
||||
units = {'g': 1024, 'm': 1, 't': 1 << 20, 'k': 1.0 / 1024}
|
||||
if s[-1].lower() not in units:
|
||||
raise ValueError("invalid format: " + s)
|
||||
return int(float(s[:-1]) * units[s[-1].lower()])
|
||||
|
||||
if __name__ == "__main__":
|
||||
import doctest
|
||||
(failure_count, test_count) = doctest.testmod()
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
Worker that receives input from Piped RDD.
|
||||
"""
|
||||
from __future__ import print_function
|
||||
from __future__ import absolute_import
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
|
|
@ -184,6 +184,7 @@ try:
|
|||
'pyspark.python.lib',
|
||||
'pyspark.data',
|
||||
'pyspark.licenses',
|
||||
'pyspark.resource',
|
||||
'pyspark.examples.src.main.python'],
|
||||
include_package_data=True,
|
||||
package_dir={
|
||||
|
|
Loading…
Reference in a new issue