Large scale load and locality tests for the coalesced partitions added

This commit is contained in:
Ali Ghodsi 2013-08-14 19:40:24 -07:00
parent 66edf854aa
commit 7a2a33e32d
2 changed files with 118 additions and 63 deletions

View file

@ -36,7 +36,21 @@ private[spark] case class CoalescedRDDPartition(
oos.defaultWriteObject()
}
/**
* Gets the preferred location for this coalesced RDD partition. Most parent indices should prefer this machine.
* @return preferred location
*/
def getPreferredLocation = prefLoc
/**
* Computes how many of the parents partitions have getPreferredLocation as one of their preferredLocations
* @return locality of this coalesced partition between 0 and 1
*/
def localFraction :Double = {
var loc: Int = 0
parents.foreach(p => if (rdd.preferredLocations(p).contains(getPreferredLocation)) loc += 1)
if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
}
}
/**
@ -55,13 +69,9 @@ class CoalescedRDD[T: ClassManifest](
override def getPartitions: Array[Partition] = {
val res = mutable.ArrayBuffer[CoalescedRDDPartition]()
val targetLen = math.min(prev.partitions.length, maxPartitions)
val packer = new PartitionCoalescer(maxPartitions, prev, balanceSlack)
packer.setupGroups(targetLen) // setup the groups (bins) and preferred locations
packer.throwBalls() // assign partitions (balls) to each group (bins)
for ((pg, i) <- packer.groupArr.zipWithIndex) {
for ((pg, i) <- packer.getPartitions.zipWithIndex) {
val ids = pg.list.map(_.index).toArray
res += new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
}
@ -86,27 +96,68 @@ class CoalescedRDD[T: ClassManifest](
super.clearDependencies()
prev = null
}
/**
* Returns the preferred machine for the split. If split is of type CoalescedRDDPartition, then the preferred machine
* will be one which most parent splits prefer too.
* @param split
* @return the machine most preferred by split
*/
override def getPreferredLocations(split: Partition): Seq[String] = {
if (split.isInstanceOf[CoalescedRDDPartition])
List(split.asInstanceOf[CoalescedRDDPartition].getPreferredLocation)
else
super.getPreferredLocations(split)
}
}
class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
private def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
private def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get)
private val rnd = new scala.util.Random(7919) // keep this class deterministic
// each element of groupArr represents one coalesced partition
private val groupArr = mutable.ArrayBuffer[PartitionGroup]()
// hash used to check whether some machine is already in groupArr
private val groupHash = mutable.Map[String, mutable.ListBuffer[PartitionGroup]]()
// hash used for the first maxPartitions (to avoid duplicates)
private val initialHash = mutable.Map[Partition, Boolean]()
// determines the tradeoff between load-balancing the partitions sizes and their locality
// e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality
private val slack = (balanceSlack * prev.partitions.size).toInt
private var noLocality = true // if true if no preferredLocations exists for parent RDD
this.setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins) and preferred locations
this.throwBalls() // assign partitions (balls) to each group (bins)
def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
// this class just keeps iterating and rotating infinitely over the partitions of the RDD
// next() returns the next preferred machine that a partition is replicated on
// the rotator first goes through the first replica copy of each partition, then second, then third
private class RotateLocations(prev: RDD[_]) extends Iterator[String] {
// the iterators return type is a tuple: (replicaString, partition)
private class RotateLocations(prev: RDD[_]) extends Iterator[(String, Partition)] {
private var it: Iterator[String] = resetIterator()
private var it: Iterator[(String, Partition)] = resetIterator()
override val isEmpty = !it.hasNext
// initializes/resets to start iterating from the beginning
private def resetIterator() = {
val i1 = prev.partitions.view.map( (p: Partition) =>
{ if (prev.preferredLocations(p).length > 0) Some(prev.preferredLocations(p)(0)) else None } )
{ if (prev.preferredLocations(p).length > 0) Some((prev.preferredLocations(p)(0),p)) else None } )
val i2 = prev.partitions.view.map( (p: Partition) =>
{ if (prev.preferredLocations(p).length > 1) Some(prev.preferredLocations(p)(1)) else None } )
{ if (prev.preferredLocations(p).length > 1) Some((prev.preferredLocations(p)(1),p)) else None } )
val i3 = prev.partitions.view.map( (p: Partition) =>
{ if (prev.preferredLocations(p).length > 2) Some(prev.preferredLocations(p)(2)) else None } )
{ if (prev.preferredLocations(p).length > 2) Some((prev.preferredLocations(p)(2),p)) else None } )
val res = List(i1,i2,i3)
res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd) into one iterator
}
@ -115,7 +166,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
def hasNext(): Boolean = !isEmpty
// return the next preferredLocation of some partition of the RDD
def next(): String = {
def next(): (String, Partition) = {
if (it.hasNext)
it.next()
else {
@ -126,42 +177,11 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
}
case class PartitionGroup(prefLoc: String = "") {
var list = mutable.MutableList[Partition]()
var list = mutable.ListBuffer[Partition]()
def size = list.size
// returns number of partitions that got locality in this group
def local = {
var loc: Int = 0
list.foreach(p => if (prev.preferredLocations(p).contains(prefLoc)) loc += 1)
loc
}
override def toString(): String = {
val localityStr = if (size == 0) "0" else (local*100. / size).toInt.toString
"PartitionGroup(\"" + prefLoc + "\") size: " + size + " locality: " + localityStr +"% \n"
// list.map("\t\t" + _.toString).mkString("\n") + "\n"
}
}
def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get)
val rnd = new scala.util.Random(7919) // keep this class deterministic
// each element of groupArr represents one coalesced partition
val groupArr = mutable.ArrayBuffer[PartitionGroup]()
// hash used to check whether some machine is already in groupArr
val groupHash = mutable.Map[String, mutable.MutableList[PartitionGroup]]()
// determines the tradeoff between load-balancing the partitions sizes and their locality
// e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality
val slack = (balanceSlack * prev.partitions.size).toInt
private var noLocality = true // if true if no preferredLocations exists for parent RDD
/**
* Sorts and gets the least element of the list associated with key in groupHash
* The returned PartitionGroup is the least loaded of all groups that represent the machine "key"
@ -172,13 +192,21 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
groupHash.get(key).map(_.sortWith(compare).head)
}
def addPartToPGroup(part : Partition, pgroup : PartitionGroup) : Boolean = {
if (!initialHash.contains(part)) {
pgroup.list += part // already preassign this element, ensures every bucket will have 1 element
initialHash += (part -> true) // needed to avoid assigning partitions to multiple buckets/groups
true
} else false
}
/**
* Initializes targetLen partition groups and assigns a preferredLocation
* This uses coupon collector to estimate how many preferredLocations it must rotate through until it has seen
* most of the preferred locations (2 * n log(n))
* @param targetLen
*/
def setupGroups(targetLen: Int) {
private def setupGroups(targetLen: Int) {
val rotIt = new RotateLocations(prev)
// deal with empty rotator case, just create targetLen partition groups with no preferred location
@ -199,22 +227,29 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
// likely targetLen >> number of preferred locations (more buckets than there are machines)
while (numCreated < targetLen && tries < expectedCoupons2) {
tries += 1
val nxt = rotIt.next()
if (!groupHash.contains(nxt)) {
val pgroup = PartitionGroup(nxt)
val (nxt_replica, nxt_part) = rotIt.next()
if (!groupHash.contains(nxt_replica)) {
val pgroup = PartitionGroup(nxt_replica)
groupArr += pgroup
groupHash += (nxt -> (mutable.MutableList(pgroup))) // use list in case we have multiple groups for same machine
addPartToPGroup(nxt_part, pgroup)
groupHash += (nxt_replica -> (mutable.ListBuffer(pgroup))) // list in case we have multiple groups per machine
numCreated += 1
}
}
while (numCreated < targetLen) { // if we don't have enough partition groups, just create duplicates
val nxt = rotIt.next()
val pgroup = PartitionGroup(nxt)
var (nxt_replica, nxt_part) = rotIt.next()
val pgroup = PartitionGroup(nxt_replica)
groupArr += pgroup
groupHash.get(nxt).get += pgroup
groupHash.get(nxt_replica).get += pgroup
var tries = 0
while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure each group has at least one partition
nxt_part = rotIt.next()._2
tries += 1
}
numCreated += 1
}
}
/**
@ -224,7 +259,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
* @param p partition (ball to be thrown)
* @return partition group (bin to be put in)
*/
def pickBin(p: Partition): PartitionGroup = {
private def pickBin(p: Partition): PartitionGroup = {
val pref = prev.preferredLocations(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded bin of replicas
val prefPart = if (pref == Nil) None else pref.head
@ -243,7 +278,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
}
}
def throwBalls() {
private def throwBalls() {
if (noLocality) { // no preferredLocations in parent RDD, no randomization needed
if (maxPartitions > groupArr.size) { // just return prev.partitions
for ((p,i) <- prev.partitions.zipWithIndex) {
@ -257,7 +292,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
}
}
} else {
for (p <- prev.partitions) { // throw every partition (ball) into a partition group (bin)
for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into a partition group
pickBin(p).list += p
}
}

View file

@ -22,7 +22,8 @@ import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts._
import org.scalatest.time.{Span, Millis}
import spark.SparkContext._
import spark.rdd.{CoalescedRDD, CoGroupedRDD, EmptyRDD, PartitionPruningRDD, ShuffledRDD}
import spark.rdd._
import scala.collection.parallel.mutable
class RDDSuite extends FunSuite with SharedSparkContext {
@ -184,16 +185,35 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(splits.foldLeft(true)( (x,y) => if (!x) false else y.length >= 2) === true) // descent balance (2+ per bin)
val prefs = List(List("m1","m2","m3"), List("m4","m5","m6"))
val data2 = sc.makeRDD((1 to 100).map( i => (i, prefs(i % 2) ))) // alternate machine prefs
val coalesced2 = data2.coalesce(10)
val splits2 = coalesced2.glom().collect().map(_.toList).toList
// If we try to coalesce into more partitions than the original RDD, it should just
// keep the original number of partitions.
val coalesced4 = data.coalesce(20)
assert(coalesced4.glom().collect().map(_.toList).toList.sortWith(
(x, y) => if (x.isEmpty) false else if (y.isEmpty) true else x(0) < y(0)) === (1 to 9).map(x => List(x)).toList)
// this gives a list of pairs, each pair is of the form (even,odd), where even is the number of even elements...
val list = splits2.map( ls => ls.foldLeft((0,0))( (x,y) => if (y % 2 == 0) (x._1+1,x._2) else (x._1,x._2+1)) )
val maxes = list.map( { case (a,b) => if (a>b) a else b } ) // get the maxs, this represents the locality
maxes.foreach( locality => assert( locality > 7) ) // at least 70% locality in each partition
// large scale experiment
import collection.mutable
val rnd = scala.util.Random
val partitions = 10000
val numMachines = 50
val machines = mutable.ListBuffer[String]()
(1 to numMachines).foreach(machines += "m"+_)
val blocks = (1 to partitions).map( i => (i, (i to (i+2)).map{ j => machines(rnd.nextInt(machines.size)) } ))
val data2 = sc.makeRDD(blocks)
val coalesced2 = data2.coalesce(numMachines*2)
// test that you get over 95% locality in each group
val minLocality = coalesced2.partitions.map( part => part.asInstanceOf[CoalescedRDDPartition].localFraction )
.foldLeft(100.)( (perc, loc) => math.min(perc,loc) )
assert(minLocality > 0.95)
// test that the groups are load balanced with 100 +/- 15 elements in each
val maxImbalance = coalesced2.partitions.map( part => part.asInstanceOf[CoalescedRDDPartition].parents.size )
.foldLeft(0)((dev, curr) => math.max(math.abs(100-curr),dev))
assert(maxImbalance < 15)
}
test("zipped RDDs") {