[SPARK-7988] [STREAMING] Round-robin scheduling of receivers by default
Minimal PR for round-robin scheduling of receivers. Dense scheduling can be enabled by setting preferredLocation, so a new config parameter isn't really needed. Tested this on a cluster of 6 nodes and noticed 20-25% gain in throughput compared to random scheduling. tdas pwendell Author: nishkamravi2 <nishkamravi@gmail.com> Author: Nishkam Ravi <nravi@cloudera.com> Closes #6607 from nishkamravi2/master_nravi and squashes the following commits: 1918819 [Nishkam Ravi] Update ReceiverTrackerSuite.scala f747739 [Nishkam Ravi] Update ReceiverTrackerSuite.scala 6127e58 [Nishkam Ravi] Update ReceiverTracker and ReceiverTrackerSuite 9f1abc2 [nishkamravi2] Update ReceiverTrackerSuite.scala ae29152 [Nishkam Ravi] Update test suite with TD's suggestions 48a4a97 [nishkamravi2] Update ReceiverTracker.scala bc23907 [nishkamravi2] Update ReceiverTracker.scala 68e8540 [nishkamravi2] Update SchedulerSuite.scala 4604f28 [nishkamravi2] Update SchedulerSuite.scala 179b90f [nishkamravi2] Update ReceiverTracker.scala 242e677 [nishkamravi2] Update SchedulerSuite.scala 7f3e028 [Nishkam Ravi] Update ReceiverTracker.scala, add unit test cases in SchedulerSuite f8a3e05 [nishkamravi2] Update ReceiverTracker.scala 4cf97b6 [nishkamravi2] Update ReceiverTracker.scala 16e84ec [Nishkam Ravi] Update ReceiverTracker.scala 45e3a99 [Nishkam Ravi] Merge branch 'master_nravi' of https://github.com/nishkamravi2/spark into master_nravi 02dbdb8 [Nishkam Ravi] Update ReceiverTracker.scala 07b9dfa [nishkamravi2] Update ReceiverTracker.scala 6caeefe [nishkamravi2] Update ReceiverTracker.scala 7888257 [nishkamravi2] Update ReceiverTracker.scala 6e3515c [Nishkam Ravi] Minor changes 975b8d8 [Nishkam Ravi] Merge branch 'master_nravi' of https://github.com/nishkamravi2/spark into master_nravi 3cac21b [Nishkam Ravi] Generalize the scheduling algorithm b05ee2f [nishkamravi2] Update ReceiverTracker.scala bb5e09b [Nishkam Ravi] Add a new var in receiver to store location information for round-robin scheduling 41705de [nishkamravi2] Update ReceiverTracker.scala fff1b2e [Nishkam Ravi] Round-robin scheduling of streaming receivers
This commit is contained in:
parent
9213f73a8e
commit
ca7e460f7d
|
@ -17,8 +17,10 @@
|
||||||
|
|
||||||
package org.apache.spark.streaming.scheduler
|
package org.apache.spark.streaming.scheduler
|
||||||
|
|
||||||
import scala.collection.mutable.{HashMap, SynchronizedMap}
|
import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedMap}
|
||||||
import scala.language.existentials
|
import scala.language.existentials
|
||||||
|
import scala.math.max
|
||||||
|
import org.apache.spark.rdd._
|
||||||
|
|
||||||
import org.apache.spark.streaming.util.WriteAheadLogUtils
|
import org.apache.spark.streaming.util.WriteAheadLogUtils
|
||||||
import org.apache.spark.{Logging, SparkEnv, SparkException}
|
import org.apache.spark.{Logging, SparkEnv, SparkException}
|
||||||
|
@ -272,6 +274,41 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the list of executors excluding driver
|
||||||
|
*/
|
||||||
|
private def getExecutors(ssc: StreamingContext): List[String] = {
|
||||||
|
val executors = ssc.sparkContext.getExecutorMemoryStatus.map(_._1.split(":")(0)).toList
|
||||||
|
val driver = ssc.sparkContext.getConf.get("spark.driver.host")
|
||||||
|
executors.diff(List(driver))
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Set host location(s) for each receiver so as to distribute them over
|
||||||
|
* executors in a round-robin fashion taking into account preferredLocation if set
|
||||||
|
*/
|
||||||
|
private[streaming] def scheduleReceivers(receivers: Seq[Receiver[_]],
|
||||||
|
executors: List[String]): Array[ArrayBuffer[String]] = {
|
||||||
|
val locations = new Array[ArrayBuffer[String]](receivers.length)
|
||||||
|
var i = 0
|
||||||
|
for (i <- 0 until receivers.length) {
|
||||||
|
locations(i) = new ArrayBuffer[String]()
|
||||||
|
if (receivers(i).preferredLocation.isDefined) {
|
||||||
|
locations(i) += receivers(i).preferredLocation.get
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var count = 0
|
||||||
|
for (i <- 0 until max(receivers.length, executors.length)) {
|
||||||
|
if (!receivers(i % receivers.length).preferredLocation.isDefined) {
|
||||||
|
locations(i % receivers.length) += executors(count)
|
||||||
|
count += 1
|
||||||
|
if (count == executors.length) {
|
||||||
|
count = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
locations
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the receivers from the ReceiverInputDStreams, distributes them to the
|
* Get the receivers from the ReceiverInputDStreams, distributes them to the
|
||||||
* worker nodes as a parallel collection, and runs them.
|
* worker nodes as a parallel collection, and runs them.
|
||||||
|
@ -283,18 +320,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
|
||||||
rcvr
|
rcvr
|
||||||
})
|
})
|
||||||
|
|
||||||
// Right now, we only honor preferences if all receivers have them
|
|
||||||
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
|
|
||||||
|
|
||||||
// Create the parallel collection of receivers to distributed them on the worker nodes
|
|
||||||
val tempRDD =
|
|
||||||
if (hasLocationPreferences) {
|
|
||||||
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
|
|
||||||
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
|
|
||||||
} else {
|
|
||||||
ssc.sc.makeRDD(receivers, receivers.size)
|
|
||||||
}
|
|
||||||
|
|
||||||
val checkpointDirOption = Option(ssc.checkpointDir)
|
val checkpointDirOption = Option(ssc.checkpointDir)
|
||||||
val serializableHadoopConf =
|
val serializableHadoopConf =
|
||||||
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
|
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
|
||||||
|
@ -311,12 +336,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
|
||||||
supervisor.start()
|
supervisor.start()
|
||||||
supervisor.awaitTermination()
|
supervisor.awaitTermination()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the dummy Spark job to ensure that all slaves have registered.
|
// Run the dummy Spark job to ensure that all slaves have registered.
|
||||||
// This avoids all the receivers to be scheduled on the same node.
|
// This avoids all the receivers to be scheduled on the same node.
|
||||||
if (!ssc.sparkContext.isLocal) {
|
if (!ssc.sparkContext.isLocal) {
|
||||||
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
|
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get the list of executors and schedule receivers
|
||||||
|
val executors = getExecutors(ssc)
|
||||||
|
val tempRDD =
|
||||||
|
if (!executors.isEmpty) {
|
||||||
|
val locations = scheduleReceivers(receivers, executors)
|
||||||
|
val roundRobinReceivers = (0 until receivers.length).map(i =>
|
||||||
|
(receivers(i), locations(i)))
|
||||||
|
ssc.sc.makeRDD[Receiver[_]](roundRobinReceivers)
|
||||||
|
} else {
|
||||||
|
ssc.sc.makeRDD(receivers, receivers.size)
|
||||||
|
}
|
||||||
|
|
||||||
// Distribute the receivers and start them
|
// Distribute the receivers and start them
|
||||||
logInfo("Starting " + receivers.length + " receivers")
|
logInfo("Starting " + receivers.length + " receivers")
|
||||||
running = true
|
running = true
|
||||||
|
|
|
@ -0,0 +1,90 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.spark.streaming.scheduler
|
||||||
|
|
||||||
|
import org.apache.spark.streaming._
|
||||||
|
import org.apache.spark.SparkConf
|
||||||
|
import org.apache.spark.storage.StorageLevel
|
||||||
|
import org.apache.spark.streaming.receiver._
|
||||||
|
import org.apache.spark.util.Utils
|
||||||
|
|
||||||
|
/** Testsuite for receiver scheduling */
|
||||||
|
class ReceiverTrackerSuite extends TestSuiteBase {
|
||||||
|
val sparkConf = new SparkConf().setMaster("local[8]").setAppName("test")
|
||||||
|
val ssc = new StreamingContext(sparkConf, Milliseconds(100))
|
||||||
|
val tracker = new ReceiverTracker(ssc)
|
||||||
|
val launcher = new tracker.ReceiverLauncher()
|
||||||
|
val executors: List[String] = List("0", "1", "2", "3")
|
||||||
|
|
||||||
|
test("receiver scheduling - all or none have preferred location") {
|
||||||
|
|
||||||
|
def parse(s: String): Array[Array[String]] = {
|
||||||
|
val outerSplit = s.split("\\|")
|
||||||
|
val loc = new Array[Array[String]](outerSplit.length)
|
||||||
|
var i = 0
|
||||||
|
for (i <- 0 until outerSplit.length) {
|
||||||
|
loc(i) = outerSplit(i).split("\\,")
|
||||||
|
}
|
||||||
|
loc
|
||||||
|
}
|
||||||
|
|
||||||
|
def testScheduler(numReceivers: Int, preferredLocation: Boolean, allocation: String) {
|
||||||
|
val receivers =
|
||||||
|
if (preferredLocation) {
|
||||||
|
Array.tabulate(numReceivers)(i => new DummyReceiver(host =
|
||||||
|
Some(((i + 1) % executors.length).toString)))
|
||||||
|
} else {
|
||||||
|
Array.tabulate(numReceivers)(_ => new DummyReceiver)
|
||||||
|
}
|
||||||
|
val locations = launcher.scheduleReceivers(receivers, executors)
|
||||||
|
val expectedLocations = parse(allocation)
|
||||||
|
assert(locations.deep === expectedLocations.deep)
|
||||||
|
}
|
||||||
|
|
||||||
|
testScheduler(numReceivers = 5, preferredLocation = false, allocation = "0|1|2|3|0")
|
||||||
|
testScheduler(numReceivers = 3, preferredLocation = false, allocation = "0,3|1|2")
|
||||||
|
testScheduler(numReceivers = 4, preferredLocation = true, allocation = "1|2|3|0")
|
||||||
|
}
|
||||||
|
|
||||||
|
test("receiver scheduling - some have preferred location") {
|
||||||
|
val numReceivers = 4;
|
||||||
|
val receivers: Seq[Receiver[_]] = Seq(new DummyReceiver(host = Some("1")),
|
||||||
|
new DummyReceiver, new DummyReceiver, new DummyReceiver)
|
||||||
|
val locations = launcher.scheduleReceivers(receivers, executors)
|
||||||
|
assert(locations(0)(0) === "1")
|
||||||
|
assert(locations(1)(0) === "0")
|
||||||
|
assert(locations(2)(0) === "1")
|
||||||
|
assert(locations(0).length === 1)
|
||||||
|
assert(locations(3).length === 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dummy receiver implementation
|
||||||
|
*/
|
||||||
|
private class DummyReceiver(host: Option[String] = None)
|
||||||
|
extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
|
||||||
|
|
||||||
|
def onStart() {
|
||||||
|
}
|
||||||
|
|
||||||
|
def onStop() {
|
||||||
|
}
|
||||||
|
|
||||||
|
override def preferredLocation: Option[String] = host
|
||||||
|
}
|
Loading…
Reference in a new issue