[SPARK-35083][CORE] Support remote scheduler pool files

### What changes were proposed in this pull request?

Use hadoop FileSystem instead of FileInputStream.

### Why are the changes needed?

Make `spark.scheduler.allocation.file` suport remote file. When using Spark as a server (e.g. SparkThriftServer), it's hard for user to specify a local path as the scheduler pool.

### Does this PR introduce _any_ user-facing change?

Yes, a minor feature.

### How was this patch tested?

Pass `core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala` and manul test
After add config `spark.scheduler.allocation.file=hdfs:///tmp/fairscheduler.xml`. We intrudoce the configed pool.
![pool1](https://user-images.githubusercontent.com/12025282/114810037-df065700-9ddd-11eb-8d7a-54b59a07ee7b.jpg)

Closes #32184 from ulysses-you/SPARK-35083.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
ulysses-you 2021-04-16 00:18:35 -07:00 committed by Dongjoon Hyun
parent 3f4c32b3ca
commit 345c380778
4 changed files with 48 additions and 15 deletions

View file

@ -17,13 +17,15 @@
package org.apache.spark.scheduler
import java.io.{FileInputStream, InputStream}
import java.io.InputStream
import java.util.{Locale, NoSuchElementException, Properties}
import scala.util.control.NonFatal
import scala.xml.{Node, XML}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
@ -54,10 +56,10 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
}
}
private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext)
extends SchedulableBuilder with Logging {
val schedulerAllocFile = conf.get(SCHEDULER_ALLOCATION_FILE)
val schedulerAllocFile = sc.conf.get(SCHEDULER_ALLOCATION_FILE)
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = SparkContext.SPARK_SCHEDULER_POOL
val DEFAULT_POOL_NAME = "default"
@ -74,7 +76,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
var fileData: Option[(InputStream, String)] = None
try {
fileData = schedulerAllocFile.map { f =>
val fis = new FileInputStream(f)
val filePath = new Path(f)
val fis = filePath.getFileSystem(sc.hadoopConfiguration).open(filePath)
logInfo(s"Creating Fair Scheduler pools from $f")
Some((fis, f))
}.getOrElse {

View file

@ -205,7 +205,7 @@ private[spark] class TaskSchedulerImpl(
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
new FairSchedulableBuilder(rootPool, sc)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")

View file

@ -20,10 +20,14 @@ package org.apache.spark.scheduler
import java.io.FileNotFoundException
import java.util.Properties
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.util.VersionInfo
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils}
import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.SchedulingMode._
import org.apache.spark.util.Utils
/**
* Tests that pools and the associated scheduling algorithms for FIFO and fair scheduling work
@ -87,7 +91,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val taskScheduler = new TaskSchedulerImpl(sc)
val rootPool = new Pool("", FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()
// Ensure that the XML file was read in correctly.
@ -185,9 +189,10 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-invalid-data.xml")
.getFile()
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath)
sc = new SparkContext(LOCAL, APP_NAME, conf)
val rootPool = new Pool("", FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, conf)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()
verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
@ -239,7 +244,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val taskScheduler = new TaskSchedulerImpl(sc)
val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()
// Submit a new task set manager with pool properties set to null. This should result
@ -267,7 +272,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val taskScheduler = new TaskSchedulerImpl(sc)
val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()
assert(rootPool.getSchedulableByName(TEST_POOL) === null)
@ -302,7 +307,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext(LOCAL, APP_NAME, conf)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()
verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
@ -317,7 +322,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext(LOCAL, APP_NAME, conf)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()
verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
@ -332,12 +337,36 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext(LOCAL, APP_NAME, conf)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
intercept[FileNotFoundException] {
schedulableBuilder.buildPools()
}
}
test("SPARK-35083: Support remote scheduler pool file") {
val hadoopVersion = VersionInfo.getVersion.split("\\.")
// HttpFileSystem supported since hadoop 2.9
assume(hadoopVersion.head.toInt >= 3 ||
(hadoopVersion.head.toInt == 2 && hadoopVersion(1).toInt >= 9))
val xmlPath = new Path(
Utils.getSparkClassLoader.getResource("fairscheduler-with-valid-data.xml").getFile)
TestUtils.withHttpServer(xmlPath.getParent.toUri.getPath) { baseURL =>
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE,
baseURL + "fairscheduler-with-valid-data.xml")
sc = new SparkContext(LOCAL, APP_NAME, conf)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()
verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
verifyPool(rootPool, "pool1", 3, 1, FIFO)
verifyPool(rootPool, "pool2", 4, 2, FAIR)
verifyPool(rootPool, "pool3", 2, 3, FAIR)
}
}
private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int,
expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = {
val selectedPool = rootPool.getSchedulableByName(poolName)

View file

@ -252,10 +252,11 @@ properties:
The pool properties can be set by creating an XML file, similar to `conf/fairscheduler.xml.template`,
and either putting a file named `fairscheduler.xml` on the classpath, or setting `spark.scheduler.allocation.file` property in your
[SparkConf](configuration.html#spark-properties).
[SparkConf](configuration.html#spark-properties). The file path can either be a local file path or HDFS file path.
{% highlight scala %}
conf.set("spark.scheduler.allocation.file", "/path/to/file")
conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/file")
{% endhighlight %}
The format of the XML file is simply a `<pool>` element for each pool, with different elements