diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index f999e9b0ec..61c4d0c004 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -36,7 +36,21 @@ private[spark] case class CoalescedRDDPartition( oos.defaultWriteObject() } + /** + * Gets the preferred location for this coalesced RDD partition. Most parent indices should prefer this machine. + * @return preferred location + */ def getPreferredLocation = prefLoc + + /** + * Computes how many of the parents partitions have getPreferredLocation as one of their preferredLocations + * @return locality of this coalesced partition between 0 and 1 + */ + def localFraction :Double = { + var loc: Int = 0 + parents.foreach(p => if (rdd.preferredLocations(p).contains(getPreferredLocation)) loc += 1) + if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble) + } } /** @@ -55,13 +69,9 @@ class CoalescedRDD[T: ClassManifest]( override def getPartitions: Array[Partition] = { val res = mutable.ArrayBuffer[CoalescedRDDPartition]() - val targetLen = math.min(prev.partitions.length, maxPartitions) val packer = new PartitionCoalescer(maxPartitions, prev, balanceSlack) - packer.setupGroups(targetLen) // setup the groups (bins) and preferred locations - packer.throwBalls() // assign partitions (balls) to each group (bins) - - for ((pg, i) <- packer.groupArr.zipWithIndex) { + for ((pg, i) <- packer.getPartitions.zipWithIndex) { val ids = pg.list.map(_.index).toArray res += new CoalescedRDDPartition(i, prev, ids, pg.prefLoc) } @@ -86,27 +96,68 @@ class CoalescedRDD[T: ClassManifest]( super.clearDependencies() prev = null } + + /** + * Returns the preferred machine for the split. If split is of type CoalescedRDDPartition, then the preferred machine + * will be one which most parent splits prefer too. + * @param split + * @return the machine most preferred by split + */ + override def getPreferredLocations(split: Partition): Seq[String] = { + if (split.isInstanceOf[CoalescedRDDPartition]) + List(split.asInstanceOf[CoalescedRDDPartition].getPreferredLocation) + else + super.getPreferredLocations(split) + } + } class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { + private def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size + private def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean = + if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get) + + private val rnd = new scala.util.Random(7919) // keep this class deterministic + + // each element of groupArr represents one coalesced partition + private val groupArr = mutable.ArrayBuffer[PartitionGroup]() + + // hash used to check whether some machine is already in groupArr + private val groupHash = mutable.Map[String, mutable.ListBuffer[PartitionGroup]]() + + // hash used for the first maxPartitions (to avoid duplicates) + private val initialHash = mutable.Map[Partition, Boolean]() + + // determines the tradeoff between load-balancing the partitions sizes and their locality + // e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality + private val slack = (balanceSlack * prev.partitions.size).toInt + + private var noLocality = true // if true if no preferredLocations exists for parent RDD + + this.setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins) and preferred locations + this.throwBalls() // assign partitions (balls) to each group (bins) + + def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray + // this class just keeps iterating and rotating infinitely over the partitions of the RDD // next() returns the next preferred machine that a partition is replicated on // the rotator first goes through the first replica copy of each partition, then second, then third - private class RotateLocations(prev: RDD[_]) extends Iterator[String] { + // the iterators return type is a tuple: (replicaString, partition) + private class RotateLocations(prev: RDD[_]) extends Iterator[(String, Partition)] { - private var it: Iterator[String] = resetIterator() + private var it: Iterator[(String, Partition)] = resetIterator() override val isEmpty = !it.hasNext // initializes/resets to start iterating from the beginning private def resetIterator() = { val i1 = prev.partitions.view.map( (p: Partition) => - { if (prev.preferredLocations(p).length > 0) Some(prev.preferredLocations(p)(0)) else None } ) + { if (prev.preferredLocations(p).length > 0) Some((prev.preferredLocations(p)(0),p)) else None } ) val i2 = prev.partitions.view.map( (p: Partition) => - { if (prev.preferredLocations(p).length > 1) Some(prev.preferredLocations(p)(1)) else None } ) + { if (prev.preferredLocations(p).length > 1) Some((prev.preferredLocations(p)(1),p)) else None } ) val i3 = prev.partitions.view.map( (p: Partition) => - { if (prev.preferredLocations(p).length > 2) Some(prev.preferredLocations(p)(2)) else None } ) + { if (prev.preferredLocations(p).length > 2) Some((prev.preferredLocations(p)(2),p)) else None } ) val res = List(i1,i2,i3) res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd) into one iterator } @@ -115,7 +166,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) def hasNext(): Boolean = !isEmpty // return the next preferredLocation of some partition of the RDD - def next(): String = { + def next(): (String, Partition) = { if (it.hasNext) it.next() else { @@ -126,42 +177,11 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) } case class PartitionGroup(prefLoc: String = "") { - var list = mutable.MutableList[Partition]() + var list = mutable.ListBuffer[Partition]() def size = list.size - - // returns number of partitions that got locality in this group - def local = { - var loc: Int = 0 - list.foreach(p => if (prev.preferredLocations(p).contains(prefLoc)) loc += 1) - loc - } - - override def toString(): String = { - val localityStr = if (size == 0) "0" else (local*100. / size).toInt.toString - "PartitionGroup(\"" + prefLoc + "\") size: " + size + " locality: " + localityStr +"% \n" - // list.map("\t\t" + _.toString).mkString("\n") + "\n" - } } - def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size - def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean = - if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get) - - val rnd = new scala.util.Random(7919) // keep this class deterministic - - // each element of groupArr represents one coalesced partition - val groupArr = mutable.ArrayBuffer[PartitionGroup]() - - // hash used to check whether some machine is already in groupArr - val groupHash = mutable.Map[String, mutable.MutableList[PartitionGroup]]() - - // determines the tradeoff between load-balancing the partitions sizes and their locality - // e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality - val slack = (balanceSlack * prev.partitions.size).toInt - - private var noLocality = true // if true if no preferredLocations exists for parent RDD - /** * Sorts and gets the least element of the list associated with key in groupHash * The returned PartitionGroup is the least loaded of all groups that represent the machine "key" @@ -172,13 +192,21 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) groupHash.get(key).map(_.sortWith(compare).head) } + def addPartToPGroup(part : Partition, pgroup : PartitionGroup) : Boolean = { + if (!initialHash.contains(part)) { + pgroup.list += part // already preassign this element, ensures every bucket will have 1 element + initialHash += (part -> true) // needed to avoid assigning partitions to multiple buckets/groups + true + } else false + } + /** * Initializes targetLen partition groups and assigns a preferredLocation * This uses coupon collector to estimate how many preferredLocations it must rotate through until it has seen * most of the preferred locations (2 * n log(n)) * @param targetLen */ - def setupGroups(targetLen: Int) { + private def setupGroups(targetLen: Int) { val rotIt = new RotateLocations(prev) // deal with empty rotator case, just create targetLen partition groups with no preferred location @@ -199,22 +227,29 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) // likely targetLen >> number of preferred locations (more buckets than there are machines) while (numCreated < targetLen && tries < expectedCoupons2) { tries += 1 - val nxt = rotIt.next() - if (!groupHash.contains(nxt)) { - val pgroup = PartitionGroup(nxt) + val (nxt_replica, nxt_part) = rotIt.next() + if (!groupHash.contains(nxt_replica)) { + val pgroup = PartitionGroup(nxt_replica) groupArr += pgroup - groupHash += (nxt -> (mutable.MutableList(pgroup))) // use list in case we have multiple groups for same machine + addPartToPGroup(nxt_part, pgroup) + groupHash += (nxt_replica -> (mutable.ListBuffer(pgroup))) // list in case we have multiple groups per machine numCreated += 1 } } while (numCreated < targetLen) { // if we don't have enough partition groups, just create duplicates - val nxt = rotIt.next() - val pgroup = PartitionGroup(nxt) + var (nxt_replica, nxt_part) = rotIt.next() + val pgroup = PartitionGroup(nxt_replica) groupArr += pgroup - groupHash.get(nxt).get += pgroup + groupHash.get(nxt_replica).get += pgroup + var tries = 0 + while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure each group has at least one partition + nxt_part = rotIt.next()._2 + tries += 1 + } numCreated += 1 } + } /** @@ -224,7 +259,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) * @param p partition (ball to be thrown) * @return partition group (bin to be put in) */ - def pickBin(p: Partition): PartitionGroup = { + private def pickBin(p: Partition): PartitionGroup = { val pref = prev.preferredLocations(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded bin of replicas val prefPart = if (pref == Nil) None else pref.head @@ -243,7 +278,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) } } - def throwBalls() { + private def throwBalls() { if (noLocality) { // no preferredLocations in parent RDD, no randomization needed if (maxPartitions > groupArr.size) { // just return prev.partitions for ((p,i) <- prev.partitions.zipWithIndex) { @@ -257,7 +292,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) } } } else { - for (p <- prev.partitions) { // throw every partition (ball) into a partition group (bin) + for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into a partition group pickBin(p).list += p } } diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 881bdedfe5..c200bfe909 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -22,7 +22,8 @@ import org.scalatest.FunSuite import org.scalatest.concurrent.Timeouts._ import org.scalatest.time.{Span, Millis} import spark.SparkContext._ -import spark.rdd.{CoalescedRDD, CoGroupedRDD, EmptyRDD, PartitionPruningRDD, ShuffledRDD} +import spark.rdd._ +import scala.collection.parallel.mutable class RDDSuite extends FunSuite with SharedSparkContext { @@ -184,16 +185,35 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(splits.foldLeft(true)( (x,y) => if (!x) false else y.length >= 2) === true) // descent balance (2+ per bin) - val prefs = List(List("m1","m2","m3"), List("m4","m5","m6")) - val data2 = sc.makeRDD((1 to 100).map( i => (i, prefs(i % 2) ))) // alternate machine prefs - val coalesced2 = data2.coalesce(10) - val splits2 = coalesced2.glom().collect().map(_.toList).toList + // If we try to coalesce into more partitions than the original RDD, it should just + // keep the original number of partitions. + val coalesced4 = data.coalesce(20) + assert(coalesced4.glom().collect().map(_.toList).toList.sortWith( + (x, y) => if (x.isEmpty) false else if (y.isEmpty) true else x(0) < y(0)) === (1 to 9).map(x => List(x)).toList) - // this gives a list of pairs, each pair is of the form (even,odd), where even is the number of even elements... - val list = splits2.map( ls => ls.foldLeft((0,0))( (x,y) => if (y % 2 == 0) (x._1+1,x._2) else (x._1,x._2+1)) ) - val maxes = list.map( { case (a,b) => if (a>b) a else b } ) // get the maxs, this represents the locality - maxes.foreach( locality => assert( locality > 7) ) // at least 70% locality in each partition + // large scale experiment + import collection.mutable + val rnd = scala.util.Random + val partitions = 10000 + val numMachines = 50 + val machines = mutable.ListBuffer[String]() + (1 to numMachines).foreach(machines += "m"+_) + + val blocks = (1 to partitions).map( i => (i, (i to (i+2)).map{ j => machines(rnd.nextInt(machines.size)) } )) + + val data2 = sc.makeRDD(blocks) + val coalesced2 = data2.coalesce(numMachines*2) + + // test that you get over 95% locality in each group + val minLocality = coalesced2.partitions.map( part => part.asInstanceOf[CoalescedRDDPartition].localFraction ) + .foldLeft(100.)( (perc, loc) => math.min(perc,loc) ) + assert(minLocality > 0.95) + + // test that the groups are load balanced with 100 +/- 15 elements in each + val maxImbalance = coalesced2.partitions.map( part => part.asInstanceOf[CoalescedRDDPartition].parents.size ) + .foldLeft(0)((dev, curr) => math.max(math.abs(100-curr),dev)) + assert(maxImbalance < 15) } test("zipped RDDs") {