[SPARK-29153][CORE] Add ability to merge resource profiles within a stage with Stage Level Scheduling

### What changes were proposed in this pull request?

For the stage level scheduling feature, add the ability to optionally merged resource profiles if they were specified on multiple RDD within a stage.  There is a config to enable this feature, its off by default (spark.scheduler.resourceProfile.mergeConflicts). When the config is set to true, Spark will merge the profiles selecting the max value of each resource (cores, memory, gpu, etc).  further documentation will be added with SPARK-30322.

This also added in the ability to check if an equivalent resource profile already exists. This is so that if a user is running stages and combining the same profiles over and over again we don't get an explosion in the number of profiles.

### Why are the changes needed?

To allow users to specify resource on multiple RDD and not worry as much about if they go into the same stage and fail.

### Does this PR introduce any user-facing change?

Yes, when the config is turned on it now merges the profiles instead of errorring out.

### How was this patch tested?

Unit tests

Closes #28053 from tgravescs/SPARK-29153.

Lead-authored-by: Thomas Graves <tgraves@apache.org>
Co-authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
This commit is contained in:
Thomas Graves 2020-04-02 08:30:18 -05:00
parent ec28925236
commit 55dea9be62
8 changed files with 304 additions and 15 deletions

View file

@ -1816,4 +1816,14 @@ package object config {
.bytesConf(ByteUnit.BYTE)
.createOptional
private[spark] val RESOURCE_PROFILE_MERGE_CONFLICTS =
ConfigBuilder("spark.scheduler.resource.profileMergeConflicts")
.doc("If set to true, Spark will merge ResourceProfiles when different profiles " +
"are specified in RDDs that get combined into a single stage. When they are merged, " +
"Spark chooses the maximum of each resource and creates a new ResourceProfile. The " +
"default of false results in Spark throwing an exception if multiple different " +
"ResourceProfiles are found in RDDs going into the same stage.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)
}

View file

@ -222,6 +222,11 @@ class ResourceProfile(
}
}
// check that the task resources and executor resources are equal, but id's could be different
private[spark] def resourcesEqual(rp: ResourceProfile): Boolean = {
rp.taskResources == taskResources && rp.executorResources == executorResources
}
override def hashCode(): Int = Seq(taskResources, executorResources).hashCode()
override def toString(): String = {

View file

@ -17,7 +17,9 @@
package org.apache.spark.resource
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.mutable.HashMap
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.annotation.Evolving
@ -34,7 +36,12 @@ import org.apache.spark.util.Utils.isTesting
*/
@Evolving
private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Logging {
private val resourceProfileIdToResourceProfile = new ConcurrentHashMap[Int, ResourceProfile]()
private val resourceProfileIdToResourceProfile = new HashMap[Int, ResourceProfile]()
private val (readLock, writeLock) = {
val lock = new ReentrantReadWriteLock()
(lock.readLock(), lock.writeLock())
}
private val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
addResourceProfile(defaultProfile)
@ -61,10 +68,20 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Loggin
def addResourceProfile(rp: ResourceProfile): Unit = {
isSupported(rp)
// force the computation of maxTasks and limitingResource now so we don't have cost later
rp.limitingResource(sparkConf)
val res = resourceProfileIdToResourceProfile.putIfAbsent(rp.id, rp)
if (res == null) {
var putNewProfile = false
writeLock.lock()
try {
if (!resourceProfileIdToResourceProfile.contains(rp.id)) {
val prev = resourceProfileIdToResourceProfile.put(rp.id, rp)
if (prev.isEmpty) putNewProfile = true
}
} finally {
writeLock.unlock()
}
// do this outside the write lock only when we add a new profile
if (putNewProfile) {
// force the computation of maxTasks and limitingResource now so we don't have cost later
rp.limitingResource(sparkConf)
logInfo(s"Added ResourceProfile id: ${rp.id}")
}
}
@ -74,10 +91,28 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Loggin
* it returns the default ResourceProfile created from the application level configs.
*/
def resourceProfileFromId(rpId: Int): ResourceProfile = {
val rp = resourceProfileIdToResourceProfile.get(rpId)
if (rp == null) {
throw new SparkException(s"ResourceProfileId $rpId not found!")
readLock.lock()
try {
resourceProfileIdToResourceProfile.get(rpId).getOrElse(
throw new SparkException(s"ResourceProfileId $rpId not found!")
)
} finally {
readLock.unlock()
}
}
/*
* If the ResourceProfile passed in is equivalent to an existing one, return the
* existing one, other return None
*/
def getEquivalentProfile(rp: ResourceProfile): Option[ResourceProfile] = {
readLock.lock()
try {
resourceProfileIdToResourceProfile.find { case (_, rpEntry) =>
rpEntry.resourcesEqual(rp)
}.map(_._2)
} finally {
readLock.unlock()
}
rp
}
}

View file

@ -186,6 +186,8 @@ private[spark] class DAGScheduler(
/** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
private val disallowStageRetryForTest = sc.getConf.get(TEST_NO_STAGE_RETRY)
private val shouldMergeResourceProfiles = sc.getConf.get(config.RESOURCE_PROFILE_MERGE_CONFLICTS)
/**
* Whether to unregister all the outputs on the host in condition that we receive a FetchFailure,
* this is set default to false, which means, we only unregister the outputs related to the exact
@ -447,10 +449,27 @@ private[spark] class DAGScheduler(
stageResourceProfiles: HashSet[ResourceProfile]): ResourceProfile = {
logDebug(s"Merging stage rdd profiles: $stageResourceProfiles")
val resourceProfile = if (stageResourceProfiles.size > 1) {
// add option later to actually merge profiles - SPARK-29153
throw new IllegalArgumentException("Multiple ResourceProfile's specified in the RDDs for " +
"this stage, please resolve the conflicting ResourceProfile's as Spark doesn't" +
"currently support merging them.")
if (shouldMergeResourceProfiles) {
val startResourceProfile = stageResourceProfiles.head
val mergedProfile = stageResourceProfiles.drop(1)
.foldLeft(startResourceProfile)((a, b) => mergeResourceProfiles(a, b))
// compared merged profile with existing ones so we don't add it over and over again
// if the user runs the same operation multiple times
val resProfile = sc.resourceProfileManager.getEquivalentProfile(mergedProfile)
resProfile match {
case Some(existingRp) => existingRp
case None =>
// this ResourceProfile could be different if it was merged so we have to add it to
// our ResourceProfileManager
sc.resourceProfileManager.addResourceProfile(mergedProfile)
mergedProfile
}
} else {
throw new IllegalArgumentException("Multiple ResourceProfiles specified in the RDDs for " +
"this stage, either resolve the conflicting ResourceProfiles yourself or enable " +
s"${config.RESOURCE_PROFILE_MERGE_CONFLICTS.key} and understand how Spark handles " +
"the merging them.")
}
} else {
if (stageResourceProfiles.size == 1) {
stageResourceProfiles.head
@ -461,6 +480,27 @@ private[spark] class DAGScheduler(
resourceProfile
}
// This is a basic function to merge resource profiles that takes the max
// value of the profiles. We may want to make this more complex in the future as
// you may want to sum some resources (like memory).
private[scheduler] def mergeResourceProfiles(
r1: ResourceProfile,
r2: ResourceProfile): ResourceProfile = {
val mergedExecKeys = r1.executorResources ++ r2.executorResources
val mergedExecReq = mergedExecKeys.map { case (k, v) =>
val larger = r1.executorResources.get(k).map( x =>
if (x.amount > v.amount) x else v).getOrElse(v)
k -> larger
}
val mergedTaskKeys = r1.taskResources ++ r2.taskResources
val mergedTaskReq = mergedTaskKeys.map { case (k, v) =>
val larger = r1.taskResources.get(k).map( x =>
if (x.amount > v.amount) x else v).getOrElse(v)
k -> larger
}
new ResourceProfile(mergedExecReq, mergedTaskReq)
}
/**
* Create a ResultStage associated with the provided jobId.
*/

View file

@ -98,6 +98,36 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
assert(error.contains("ResourceProfiles are only supported on YARN with dynamic allocation"))
}
test("ResourceProfileManager has equivalent profile") {
val conf = new SparkConf().set(EXECUTOR_CORES, 4)
val rpmanager = new ResourceProfileManager(conf)
var rpAlreadyExist: Option[ResourceProfile] = None
val checkId = 500
for (i <- 1 to 1000) {
val rprofBuilder = new ResourceProfileBuilder()
val ereqs = new ExecutorResourceRequests()
ereqs.cores(i).memory("4g").memoryOverhead("2000m")
val treqs = new TaskResourceRequests()
treqs.cpus(i)
rprofBuilder.require(ereqs).require(treqs)
val rprof = rprofBuilder.build
rpmanager.addResourceProfile(rprof)
if (i == checkId) rpAlreadyExist = Some(rprof)
}
val rpNotMatch = new ResourceProfileBuilder().build
assert(rpmanager.getEquivalentProfile(rpNotMatch).isEmpty,
s"resourceProfile should not have existed")
val rprofBuilder = new ResourceProfileBuilder()
val ereqs = new ExecutorResourceRequests()
ereqs.cores(checkId).memory("4g").memoryOverhead("2000m")
val treqs = new TaskResourceRequests()
treqs.cpus(checkId)
rprofBuilder.require(ereqs).require(treqs)
val rpShouldMatch = rprofBuilder.build
val equivProf = rpmanager.getEquivalentProfile(rpShouldMatch)
assert(equivProf.nonEmpty)
assert(equivProf.get.id == rpAlreadyExist.get.id, s"resourceProfile should have existed")
}
}

View file

@ -193,6 +193,26 @@ class ResourceProfileSuite extends SparkFunSuite {
assert(rprof.taskResources("cpus").amount === 1, "Task resources should have cpu")
}
test("test ResourceProfiles equal") {
val rprofBuilder = new ResourceProfileBuilder()
val taskReq = new TaskResourceRequests().resource("gpu", 1)
val eReq = new ExecutorResourceRequests().resource("gpu", 2, "myscript", "nvidia")
rprofBuilder.require(taskReq).require(eReq)
val rprof = rprofBuilder.build
val rprofBuilder2 = new ResourceProfileBuilder()
val taskReq2 = new TaskResourceRequests().resource("gpu", 1)
val eReq2 = new ExecutorResourceRequests().resource("gpu", 2, "myscript", "nvidia")
rprofBuilder2.require(taskReq2).require(eReq2)
val rprof2 = rprofBuilder.build
rprof2.setResourceProfileId(rprof.id)
assert(rprof === rprof2, "resource profile equality not working")
rprof2.setResourceProfileId(rprof.id + 1)
assert(rprof.id != rprof2.id, "resource profiles should not have same id")
assert(rprof.resourcesEqual(rprof2), "resource profile resourcesEqual not working")
}
test("Test ExecutorResourceRequests memory helpers") {
val rprof = new ResourceProfileBuilder()
val ereqs = new ExecutorResourceRequests()

View file

@ -35,6 +35,7 @@ import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.config
import org.apache.spark.rdd.{DeterministicLevel, RDD}
import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder, TaskResourceRequests}
import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
@ -3184,7 +3185,144 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
scheduler.mergeResourceProfilesForStage(resourceprofiles)
}.getMessage()
assert(error.contains("Multiple ResourceProfile's specified in the RDDs"))
assert(error.contains("Multiple ResourceProfiles specified in the RDDs"))
}
test("test 2 resource profile with merge conflict config true") {
afterEach()
val conf = new SparkConf()
conf.set(config.RESOURCE_PROFILE_MERGE_CONFLICTS.key, "true")
init(conf)
val ereqs = new ExecutorResourceRequests().cores(4)
val treqs = new TaskResourceRequests().cpus(1)
val rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build
val ereqs2 = new ExecutorResourceRequests().cores(2)
val treqs2 = new TaskResourceRequests().cpus(2)
val rp2 = new ResourceProfileBuilder().require(ereqs2).require(treqs2).build
val rdd = sc.parallelize(1 to 10).withResources(rp1).map(x => (x, x)).withResources(rp2)
val (shuffledeps, resourceprofiles) = scheduler.getShuffleDependenciesAndResourceProfiles(rdd)
val mergedRp = scheduler.mergeResourceProfilesForStage(resourceprofiles)
assert(mergedRp.getTaskCpus.get == 2)
assert(mergedRp.getExecutorCores.get == 4)
}
test("test multiple resource profiles created from merging use same rp") {
afterEach()
val conf = new SparkConf()
conf.set(config.RESOURCE_PROFILE_MERGE_CONFLICTS.key, "true")
init(conf)
val ereqs = new ExecutorResourceRequests().cores(4)
val treqs = new TaskResourceRequests().cpus(1)
val rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build
val ereqs2 = new ExecutorResourceRequests().cores(2)
val treqs2 = new TaskResourceRequests().cpus(2)
val rp2 = new ResourceProfileBuilder().require(ereqs2).require(treqs2).build
val rdd = sc.parallelize(1 to 10).withResources(rp1).map(x => (x, x)).withResources(rp2)
val (_, resourceprofiles) = scheduler.getShuffleDependenciesAndResourceProfiles(rdd)
val mergedRp = scheduler.mergeResourceProfilesForStage(resourceprofiles)
assert(mergedRp.getTaskCpus.get == 2)
assert(mergedRp.getExecutorCores.get == 4)
// test that instead of creating a new merged profile, we use the already created one
val rdd2 = sc.parallelize(1 to 10).withResources(rp1).map(x => (x, x)).withResources(rp2)
val (_, resourceprofiles2) = scheduler.getShuffleDependenciesAndResourceProfiles(rdd2)
val mergedRp2 = scheduler.mergeResourceProfilesForStage(resourceprofiles2)
assert(mergedRp2.id === mergedRp.id)
assert(mergedRp2.getTaskCpus.get == 2)
assert(mergedRp2.getExecutorCores.get == 4)
}
test("test merge 2 resource profiles multiple configs") {
val ereqs = new ExecutorResourceRequests().cores(4)
val treqs = new TaskResourceRequests().cpus(2)
val rp1 = new ResourceProfile(ereqs.requests, treqs.requests)
val ereqs2 = new ExecutorResourceRequests().cores(2)
val treqs2 = new TaskResourceRequests().cpus(1)
val rp2 = new ResourceProfile(ereqs2.requests, treqs2.requests)
var mergedRp = scheduler.mergeResourceProfiles(rp1, rp2)
assert(mergedRp.getTaskCpus.get == 2)
assert(mergedRp.getExecutorCores.get == 4)
val ereqs3 = new ExecutorResourceRequests().cores(1).resource(GPU, 1, "disc")
val treqs3 = new TaskResourceRequests().cpus(1).resource(GPU, 1)
val rp3 = new ResourceProfile(ereqs3.requests, treqs3.requests)
val ereqs4 = new ExecutorResourceRequests().cores(2)
val treqs4 = new TaskResourceRequests().cpus(2)
val rp4 = new ResourceProfile(ereqs4.requests, treqs4.requests)
mergedRp = scheduler.mergeResourceProfiles(rp3, rp4)
assert(mergedRp.getTaskCpus.get == 2)
assert(mergedRp.getExecutorCores.get == 2)
assert(mergedRp.executorResources.size == 2)
assert(mergedRp.taskResources.size == 2)
assert(mergedRp.executorResources.get(GPU).get.amount == 1)
assert(mergedRp.executorResources.get(GPU).get.discoveryScript == "disc")
assert(mergedRp.taskResources.get(GPU).get.amount == 1)
val ereqs5 = new ExecutorResourceRequests().cores(1).memory("3g")
.memoryOverhead("1g").pysparkMemory("2g").resource(GPU, 1, "disc")
val treqs5 = new TaskResourceRequests().cpus(1).resource(GPU, 1)
val rp5 = new ResourceProfile(ereqs5.requests, treqs5.requests)
val ereqs6 = new ExecutorResourceRequests().cores(8).resource(FPGA, 2, "fdisc")
val treqs6 = new TaskResourceRequests().cpus(2).resource(FPGA, 1)
val rp6 = new ResourceProfile(ereqs6.requests, treqs6.requests)
mergedRp = scheduler.mergeResourceProfiles(rp5, rp6)
assert(mergedRp.getTaskCpus.get == 2)
assert(mergedRp.getExecutorCores.get == 8)
assert(mergedRp.executorResources.size == 6)
assert(mergedRp.taskResources.size == 3)
assert(mergedRp.executorResources.get(GPU).get.amount == 1)
assert(mergedRp.executorResources.get(GPU).get.discoveryScript == "disc")
assert(mergedRp.taskResources.get(GPU).get.amount == 1)
assert(mergedRp.executorResources.get(FPGA).get.amount == 2)
assert(mergedRp.executorResources.get(FPGA).get.discoveryScript == "fdisc")
assert(mergedRp.taskResources.get(FPGA).get.amount == 1)
assert(mergedRp.executorResources.get(ResourceProfile.MEMORY).get.amount == 3072)
assert(mergedRp.executorResources.get(ResourceProfile.PYSPARK_MEM).get.amount == 2048)
assert(mergedRp.executorResources.get(ResourceProfile.OVERHEAD_MEM).get.amount == 1024)
val ereqs7 = new ExecutorResourceRequests().cores(1).memory("3g")
.resource(GPU, 4, "disc")
val treqs7 = new TaskResourceRequests().cpus(1).resource(GPU, 1)
val rp7 = new ResourceProfile(ereqs7.requests, treqs7.requests)
val ereqs8 = new ExecutorResourceRequests().cores(1).resource(GPU, 2, "fdisc")
val treqs8 = new TaskResourceRequests().cpus(1).resource(GPU, 2)
val rp8 = new ResourceProfile(ereqs8.requests, treqs8.requests)
mergedRp = scheduler.mergeResourceProfiles(rp7, rp8)
assert(mergedRp.getTaskCpus.get == 1)
assert(mergedRp.getExecutorCores.get == 1)
assert(mergedRp.executorResources.get(GPU).get.amount == 4)
assert(mergedRp.executorResources.get(GPU).get.discoveryScript == "disc")
assert(mergedRp.taskResources.get(GPU).get.amount == 2)
}
test("test merge 3 resource profiles") {
afterEach()
val conf = new SparkConf()
conf.set(config.RESOURCE_PROFILE_MERGE_CONFLICTS.key, "true")
init(conf)
val ereqs = new ExecutorResourceRequests().cores(4)
val treqs = new TaskResourceRequests().cpus(1)
val rp1 = new ResourceProfile(ereqs.requests, treqs.requests)
val ereqs2 = new ExecutorResourceRequests().cores(2)
val treqs2 = new TaskResourceRequests().cpus(1)
val rp2 = new ResourceProfile(ereqs2.requests, treqs2.requests)
val ereqs3 = new ExecutorResourceRequests().cores(3)
val treqs3 = new TaskResourceRequests().cpus(2)
val rp3 = new ResourceProfile(ereqs3.requests, treqs3.requests)
var mergedRp = scheduler.mergeResourceProfilesForStage(HashSet(rp1, rp2, rp3))
assert(mergedRp.getTaskCpus.get == 2)
assert(mergedRp.getExecutorCores.get == 4)
}
/**

View file

@ -2135,6 +2135,17 @@ Apart from these, the following properties are also available, and may be useful
</td>
<td>3.0.0</td>
</tr>
<tr>
<td><code>spark.scheduler.resource.profileMergeConflicts</code></td>
<td>false</td>
<td>
If set to "true", Spark will merge ResourceProfiles when different profiles are specified
in RDDs that get combined into a single stage. When they are merged, Spark chooses the maximum of
each resource and creates a new ResourceProfile. The default of false results in Spark throwing
an exception if multiple different ResourceProfiles are found in RDDs going into the same stage.
</td>
<td>3.1.0</td>
</tr>
<tr>
<td><code>spark.scheduler.blacklist.unschedulableTaskSetTimeout</code></td>
<td>120s</td>