diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala index e099669c22..30566adba3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala @@ -25,6 +25,9 @@ import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet} import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.BitSet + + import org.apache.spark._ import org.apache.spark.rdd._ import org.apache.spark.SparkContext._ @@ -69,7 +72,7 @@ class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockI */ class IndexedRDD[K: ClassManifest, V: ClassManifest]( @transient val index: RDDIndex[K], - @transient val valuesRDD: RDD[ Seq[Seq[V]] ]) + @transient val valuesRDD: RDD[ (Array[V], BitSet) ]) extends RDD[(K, V)](index.rdd.context, List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) { @@ -110,6 +113,195 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } + + + def zipJoinRDD[W: ClassManifest](other: IndexedRDD[K,W]): RDD[(K,(V,W))] = { + assert(index == other.index) + index.rdd.zipPartitions(valuesRDD, other.valuesRDD){ + (thisIndexIter, thisIter, otherIter) => + val index = thisIndexIter.next() + assert(!thisIndexIter.hasNext) + val (thisValues, thisBS) = thisIter.next() + assert(!thisIter.hasNext) + val (otherValues, otherBS) = otherIter.next() + assert(!otherIter.hasNext) + val newBS = thisBS & otherBS + + index.iterator.flatMap{ case (k,i) => + if(newBS(i)) List((k, (thisValues(i), otherValues(i)))) + else List.empty + } + } + } + + def zipJoin[W: ClassManifest](other: IndexedRDD[K,W]): IndexedRDD[K,(V,W)] = { + assert(index == other.index) + val newValuesRDD = valuesRDD.zipPartitions(other.valuesRDD){ + (thisIter, otherIter) => + val (thisValues, thisBS) = thisIter.next() + assert(!thisIter.hasNext) + val (otherValues, otherBS) = otherIter.next() + assert(!otherIter.hasNext) + val newBS = thisBS & otherBS + val newValues = new Array[(V,W)](thisValues.size) + for( i <- newBS ) { + newValues(i) = (thisValues(i), otherValues(i)) + } + List((newValues, newBS)).iterator + } + new IndexedRDD(index, newValuesRDD) + } + + + def zipJoinWithKeys[W: ClassManifest, Z: ClassManifest]( + other: RDD[(K,W)])( + f: (K, V, W) => Z, + merge: (Z,Z) => Z = (a:Z, b:Z) => a): + IndexedRDD[K,Z] = { + val cleanF = index.rdd.context.clean(f) + val cleanMerge = index.rdd.context.clean(merge) + other match { + case other: IndexedRDD[_, _] if index == other.index => { + val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){ + (thisIndexIter, thisIter, otherIter) => + val index = thisIndexIter.next() + assert(!thisIndexIter.hasNext) + val (thisValues, thisBS) = thisIter.next() + assert(!thisIter.hasNext) + val (otherValues, otherBS) = otherIter.next() + assert(!otherIter.hasNext) + val newValues = new Array[Z](thisValues.size) + val newBS = thisBS & otherBS + for( (k,i) <- index ) { + if (newBS(i)) { + newValues(i) = cleanF(k, thisValues(i), otherValues(i)) + } + } + List((newValues, newBS)).iterator + } + new IndexedRDD(index, newValues) + } + + case _ => { + // Get the partitioner from the index + val partitioner = index.rdd.partitioner match { + case Some(p) => p + case None => throw new SparkException("An index must have a partitioner.") + } + // Shuffle the other RDD using the partitioner for this index + val otherShuffled = + if (other.partitioner == Some(partitioner)) other + else other.partitionBy(partitioner) + + val newValues = index.rdd.zipPartitions(valuesRDD, other) { + (thisIndexIter, thisIter, tuplesIter) => + val index = thisIndexIter.next() + assert(!thisIndexIter.hasNext) + val (thisValues, thisBS) = thisIter.next() + assert(!thisIter.hasNext) + + val newValues = new Array[Z](thisValues.size) + // track which values are matched with values in other + val tempBS = new BitSet(thisValues.size) + + for( (k, w) <- tuplesIter if index.contains(k) ) { + val ind = index.get(k) + if(thisBS(ind)) { + val result = cleanF(k, thisValues(ind), w) + if(tempBS(ind)) { + newValues(ind) = cleanMerge(newValues(ind), result) + } else { + newValues(ind) = result + tempBS(ind) = true + } + } + } + List((newValues, tempBS)).iterator + } // end of newValues + new IndexedRDD(index, newValues) + } + } + } + + + + def zipJoinLeftWithKeys[W: ClassManifest, Z: ClassManifest]( + other: RDD[(K,W)])( + f: (K, V, Option[W]) => Z, + merge: (Z,Z) => Z = (a:Z, b:Z) => a): + IndexedRDD[K,Z] = { + val cleanF = index.rdd.context.clean(f) + val cleanMerge = index.rdd.context.clean(merge) + other match { + case other: IndexedRDD[_, _] if index == other.index => { + val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){ + (thisIndexIter, thisIter, otherIter) => + val index = thisIndexIter.next() + assert(!thisIndexIter.hasNext) + val (thisValues, thisBS) = thisIter.next() + assert(!thisIter.hasNext) + val (otherValues, otherBS) = otherIter.next() + assert(!otherIter.hasNext) + val newValues = new Array[Z](thisValues.size) + for( (k,i) <- index ) { + if (thisBS(i)) { + val otherVal = if(otherBS(i)) Some(otherValues(i)) else None + newValues(i) = cleanF(k, thisValues(i), otherVal) + } + } + List((newValues, thisBS)).iterator + } + new IndexedRDD(index, newValues) + } + + case _ => { + // Get the partitioner from the index + val partitioner = index.rdd.partitioner match { + case Some(p) => p + case None => throw new SparkException("An index must have a partitioner.") + } + // Shuffle the other RDD using the partitioner for this index + val otherShuffled = + if (other.partitioner == Some(partitioner)) other + else other.partitionBy(partitioner) + val newValues = index.rdd.zipPartitions(valuesRDD, other) { + (thisIndexIter, thisIter, tuplesIter) => + val index = thisIndexIter.next() + assert(!thisIndexIter.hasNext) + val (thisValues, thisBS) = thisIter.next() + assert(!thisIter.hasNext) + + val newValues = new Array[Z](thisValues.size) + // track which values are matched with values in other + val tempBS = new BitSet(thisValues.size) + + for( (k, w) <- tuplesIter if index.contains(k) ) { + val ind = index.get(k) + if(thisBS(ind)) { + val result = cleanF(k, thisValues(ind), Option(w)) + if(tempBS(ind)) { + newValues(ind) = cleanMerge(newValues(ind), result) + } else { + newValues(ind) = result + tempBS(ind) = true + } + } + } + + // Process remaining keys in lef join + for( (k,ind) <- index if thisBS(ind) && !tempBS(ind)) { + newValues(ind) = cleanF(k, thisValues(ind), None) + } + List((newValues, thisBS)).iterator + } // end of newValues + new IndexedRDD(index, newValues) + } + } + } + + + + /** * The IndexedRDD has its own optimized version of the pairRDDFunctions. */ @@ -120,19 +312,41 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } + override def filter(f: Tuple2[K,V] => Boolean): RDD[(K,V)] = { + val cleanF = index.rdd.context.clean(f) + val newValues = index.rdd.zipPartitions(valuesRDD){ + (keysIter, valuesIter) => + val index = keysIter.next() + assert(keysIter.hasNext() == false) + val (oldValues, bs) = valuesIter.next() + assert(valuesIter.hasNext() == false) + // Allocate the array to store the results into + val newValues: Array[V] = oldValues.clone().asInstanceOf[Array[V]] + val newBS = new BitSet(oldValues.size) + // Populate the new Values + for( (k,i) <- index ) { + if ( bs(i) && f( (k, oldValues(i)) ) ) { newBS(i) = true } + else { newValues(i) = null.asInstanceOf[V] } + } + Array((newValues, newBS)).iterator + } + new IndexedRDD[K,V](index, newValues) + } + + /** * Provide the RDD[(K,V)] equivalent output. */ override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = { - tuples.compute(part, context).flatMap { case (indexMap, values) => + tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) => // Walk the index to construct the key, value pairs - indexMap.iterator + indexMap.iterator // Extract rows with key value pairs and indicators - .map{ case (k, ind) => (k, values(ind)) } + .map{ case (k, ind) => (bs(ind), k, ind) } // Remove tuples that aren't actually present in the array - .filter{ case (_, valar) => valar != null && !valar.isEmpty()} + .filter( _._1 ) // Extract the pair (removing the indicator from the tuple) - .flatMap{ case (k, valar) => valar.map(v => (k,v))} + .map( x => (x._2, values(x._3) ) ) } } @@ -143,82 +357,100 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( object IndexedRDD { - /** - * Construct an IndexedRDD from a regular RDD[(K,V)] using an existing index - * if one is provided. - */ + + def apply[K: ClassManifest, V: ClassManifest](rdd: RDD[(K,V)]): IndexedRDD[K,V] = + apply(rdd, (a:V, b:V) => a ) + def apply[K: ClassManifest, V: ClassManifest]( - tbl: RDD[(K,V)], - existingIndex: RDDIndex[K] = null ): IndexedRDD[K, V] = { + rdd: RDD[(K,V)], reduceFunc: (V, V) => V): IndexedRDD[K,V] = { + // Preaggregate and shuffle if necessary + // Preaggregation. + val aggregator = new Aggregator[K, V, V](v => v, reduceFunc, reduceFunc) + val partitioner = new HashPartitioner(rdd.partitions.size) + val preAgg = rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner) - if (existingIndex == null) { // If no index was provided - // Shuffle the table (if necessary) - val shuffledTbl = - if (tbl.partitioner.isEmpty) { - new ShuffledRDD[K, V, (K,V)](tbl, Partitioner.defaultPartitioner(tbl)) - } else { tbl } - - val groups = shuffledTbl.mapPartitions( iter => { - val indexMap = new BlockIndex[K]() - val values = new ArrayBuffer[Seq[V]]() - for ((k,v) <- iter){ - if(!indexMap.contains(k)) { - val ind = indexMap.size - indexMap.put(k, ind) - values.append(ArrayBuffer.empty[V]) - } - val ind = indexMap.get(k) - values(ind).asInstanceOf[ArrayBuffer[V]].append(v) - } - List((indexMap, values.toSeq)).iterator - }, true).cache - // extract the index and the values - val index = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true) - val values = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) - new IndexedRDD[K,V](new RDDIndex(index), values) - } else { - val index = existingIndex - val partitioner = index.rdd.partitioner match { - case Some(p) => p - case None => throw new SparkException("An index must have a partitioner.") - } - - // Shuffle the table according to the index (if necessary) - val shuffledTbl = - if (tbl.partitioner == Some(partitioner)) { - tbl + val groups = preAgg.mapPartitions( iter => { + val indexMap = new BlockIndex[K]() + val values = new ArrayBuffer[V]() + val bs = new BitSet + for ((k,v) <- iter) { + if(!indexMap.contains(k)) { + val ind = indexMap.size + indexMap.put(k, ind) + values.append(v) + bs(ind) = true } else { - new ShuffledRDD[K, V, (K,V)](tbl, partitioner) + val ind = indexMap.get(k) + values(ind) = reduceFunc(values(ind), v) } - - // Use the index to build the new values table - val values = index.rdd.zipPartitions(shuffledTbl)( - (indexIter, tblIter) => { - // There is only one map - val index = indexIter.next() - assert(!indexIter.hasNext()) - val values = new Array[Seq[V]](index.size) - for ((k,v) <- tblIter) { - if (!index.contains(k)) { - throw new SparkException("Error: Trying to bind an external index " + - "to an RDD which contains keys that are not in the index.") - } - val ind = index(k) - if (values(ind) == null) { - values(ind) = ArrayBuffer.empty[V] - } - values(ind).asInstanceOf[ArrayBuffer[V]].append(v) - } - List(values.toSeq).iterator - }) - - new IndexedRDD[K,V](index, values) - } + } + List( (indexMap, (values.toArray, bs)) ).iterator + }, true).cache + // extract the index and the values + val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true) + val values = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) + new IndexedRDD[K,V](new RDDIndex(index), values) } - def reduceByKey[K: ClassManifest, V: ClassManifest]( - rdd: RDD[(K,V)], reduceFun: (V, V) => V, index: RDDIndex[K]): IndexedRDD[K,V] = { + + def apply[K: ClassManifest, V: ClassManifest]( + rdd: RDD[(K,V)], index: RDDIndex[K]): IndexedRDD[K,V] = + apply(rdd, index, (a:V,b:V) => a) + + + def apply[K: ClassManifest, V: ClassManifest]( + rdd: RDD[(K,V)], index: RDDIndex[K], + reduceFunc: (V, V) => V): IndexedRDD[K,V] = + apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc) + // { + // // Get the index Partitioner + // val partitioner = index.rdd.partitioner match { + // case Some(p) => p + // case None => throw new SparkException("An index must have a partitioner.") + // } + // // Preaggregate and shuffle if necessary + // val partitioned = + // if (rdd.partitioner != Some(partitioner)) { + // // Preaggregation. + // val aggregator = new Aggregator[K, V, V](v => v, reduceFunc, reduceFunc) + // rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner) + // } else { + // rdd + // } + + // // Use the index to build the new values table + // val values = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => { + // // There is only one map + // val index = indexIter.next() + // assert(!indexIter.hasNext()) + // val values = new Array[V](index.size) + // val bs = new BitSet(index.size) + // for ((k,v) <- tblIter) { + // if (!index.contains(k)) { + // throw new SparkException("Error: Trying to bind an external index " + + // "to an RDD which contains keys that are not in the index.") + // } + // val ind = index(k) + // if (bs(ind)) { + // values(ind) = reduceFunc(values(ind), v) + // } else { + // values(ind) = v + // bs(ind) = true + // } + // } + // List((values, bs)).iterator + // }) + // new IndexedRDD[K,V](index, values) + // } // end of apply + + + def apply[K: ClassManifest, V: ClassManifest, C: ClassManifest]( + rdd: RDD[(K,V)], + index: RDDIndex[K], + createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C): IndexedRDD[K,C] = { // Get the index Partitioner val partitioner = index.rdd.partitioner match { case Some(p) => p @@ -228,11 +460,11 @@ object IndexedRDD { val partitioned = if (rdd.partitioner != Some(partitioner)) { // Preaggregation. - val aggregator = new Aggregator[K, V, V](v => v, reduceFun, reduceFun) - val combined = rdd.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) - combined.partitionBy(partitioner) //new ShuffledRDD[K, V, (K, V)](combined, partitioner) + val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, + mergeCombiners) + rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner) } else { - rdd + rdd.mapValues(x => createCombiner(x)) } // Use the index to build the new values table @@ -240,25 +472,26 @@ object IndexedRDD { // There is only one map val index = indexIter.next() assert(!indexIter.hasNext()) - val values = new Array[Array[V]](index.size) - for ((k,v) <- tblIter) { + val values = new Array[C](index.size) + val bs = new BitSet(index.size) + for ((k,c) <- tblIter) { if (!index.contains(k)) { throw new SparkException("Error: Trying to bind an external index " + "to an RDD which contains keys that are not in the index.") } val ind = index(k) - if (values(ind) == null) { - values(ind) = Array(v) + if (bs(ind)) { + values(ind) = mergeCombiners(values(ind), c) } else { - values(ind)(0) = reduceFun(values(ind).head, v) + values(ind) = c + bs(ind) = true } } - List(values.view.map(x => if (x != null) x.toSeq else null ).toSeq).iterator + List((values, bs)).iterator }) + new IndexedRDD(index, values) + } // end of apply - new IndexedRDD[K,V](index, values) - - } /** * Construct and index of the unique values in a given RDD. @@ -278,9 +511,7 @@ object IndexedRDD { } case Some(partitioner) => tbl.partitionBy(partitioner) -// new ShuffledRDD[K, Boolean](tbl, partitioner) } - val index = shuffledTbl.mapPartitions( iter => { val indexMap = new BlockIndex[K]() diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala index 358ab57b0c..e497ef691f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala @@ -22,6 +22,8 @@ import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet} import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.BitSet + import org.apache.spark._ @@ -41,11 +43,15 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K */ override def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = { val cleanF = self.index.rdd.context.clean(f) - val newValues = self.valuesRDD.mapPartitions(_.map(values => values.map{ - case null => null - case row => row.map(x => f(x)) - }), true) - new IndexedRDD[K,U](self.index, newValues) + val newValuesRDD = self.valuesRDD.mapPartitions(iter => iter.map{ + case (values, bs) => + val newValues = new Array[U](values.size) + for ( ind <- bs ) { + newValues(ind) = f(values(ind)) + } + (newValues, bs) + }, preservesPartitioning = true) + new IndexedRDD[K,U](self.index, newValuesRDD) } @@ -55,20 +61,19 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K */ override def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): RDD[(K, U)] = { val cleanF = self.index.rdd.context.clean(f) - val newValues = self.index.rdd.zipPartitions(self.valuesRDD){ (keysIter, valuesIter) => + val newValues = self.index.rdd.zipPartitions(self.valuesRDD){ + (keysIter, valuesIter) => val index = keysIter.next() assert(keysIter.hasNext() == false) - val oldValues = valuesIter.next() + val (oldValues, bs) = valuesIter.next() assert(valuesIter.hasNext() == false) - // Allocate the array to store the results into - val newValues: Array[Seq[U]] = new Array[Seq[U]](oldValues.size) + // Allocate the array to store the results into + val newValues: Array[U] = new Array[U](oldValues.size) // Populate the new Values for( (k,i) <- index ) { - if(oldValues(i) != null) { - newValues(i) = oldValues(i).map( v => f(k,v) ) - } + if (bs(i)) { newValues(i) = f(k, oldValues(i)) } } - Array(newValues.toSeq).iterator + Array((newValues, bs)).iterator } new IndexedRDD[K,U](self.index, newValues) } @@ -81,11 +86,20 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K */ override def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K,U)] = { val cleanF = self.index.rdd.context.clean(f) - val newValues = self.valuesRDD.mapPartitions(_.map(values => values.map{ - case null => null - case row => row.flatMap(x => f(x)) - }), true) - new IndexedRDD[K,U](self.index, newValues) + val newValuesRDD = self.valuesRDD.mapPartitions(iter => iter.map{ + case (values, bs) => + val newValues = new Array[U](values.size) + val newBS = new BitSet(values.size) + for ( ind <- bs ) { + val res = f(values(ind)) + if(!res.isEmpty) { + newValues(ind) = res.toIterator.next() + newBS(ind) = true + } + } + (newValues, newBS) + }, preservesPartitioning = true) + new IndexedRDD[K,U](self.index, newValuesRDD) } @@ -105,31 +119,18 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)] = { - val newValues = self.valuesRDD.mapPartitions( - _.map{ groups: Seq[Seq[V]] => - groups.map{ group: Seq[V] => - if (group != null && !group.isEmpty) { - val c: C = createCombiner(group.head) - val sum: C = group.tail.foldLeft(c)(mergeValue) - Seq(sum) - } else { - null - } - } - }, true) - new IndexedRDD[K,C](self.index, newValues) + mapValues(createCombiner) } - - /** - * Group the values for each key in the RDD into a single sequence. Hash-partitions the - * resulting RDD with the existing partitioner/parallelism level. - */ - override def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { - val newValues = self.valuesRDD.mapPartitions(_.map{ar => ar.map{s => Seq(s)} }, true) - new IndexedRDD[K, Seq[V]](self.index, newValues) - } + // /** + // * Group the values for each key in the RDD into a single sequence. Hash-partitions the + // * resulting RDD with the existing partitioner/parallelism level. + // */ + // override def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { + // val newValues = self.valuesRDD.mapPartitions(_.map{ar => ar.map{s => Seq(s)} }, true) + // new IndexedRDD[K, Seq[V]](self.index, newValues) + // } /** @@ -146,23 +147,24 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K // However it is possible that both RDDs are missing a value for a given key in // which case the returned RDD should have a null value val newValues = - self.valuesRDD.zipPartitions(other.valuesRDD)( - (thisIter, otherIter) => { - val thisValues: Seq[Seq[V]] = thisIter.next() + self.valuesRDD.zipPartitions(other.valuesRDD){ + (thisIter, otherIter) => + val (thisValues, thisBS) = thisIter.next() assert(!thisIter.hasNext) - val otherValues: Seq[Seq[W]] = otherIter.next() - assert(!otherIter.hasNext) - // Zip the values and if both arrays are null then the key is not present and - // so the resulting value must be null (not a tuple of empty sequences) - val tmp: Seq[Seq[(Seq[V], Seq[W])]] = thisValues.view.zip(otherValues).map{ - case (null, null) => null // The key is not present in either RDD - case (a, null) => Seq((a, Seq.empty[W])) - case (null, b) => Seq((Seq.empty[V], b)) - case (a, b) => Seq((a,b)) - }.toSeq - List(tmp).iterator - }) - new IndexedRDD[K, (Seq[V], Seq[W])](self.index, newValues) + val (otherValues, otherBS) = otherIter.next() + assert(!otherIter.hasNext) + + val newValues = new Array[(Seq[V], Seq[W])](thisValues.size) + val newBS = thisBS | otherBS + + for( ind <- newBS ) { + val a = if (thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V] + val b = if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W] + newValues(ind) = (a, b) + } + List((newValues, newBS)).iterator + } + new IndexedRDD(self.index, newValues) } case other: IndexedRDD[_, _] if self.index.rdd.partitioner == other.index.rdd.partitioner => { @@ -197,26 +199,33 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K val newIndex = newIndexIter.next() assert(!newIndexIter.hasNext) // Get the corresponding indicies and values for this and the other IndexedRDD - val (thisIndex, thisValues) = thisTuplesIter.next() + val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next() assert(!thisTuplesIter.hasNext) - val (otherIndex, otherValues) = otherTuplesIter.next() + val (otherIndex, (otherValues, otherBS)) = otherTuplesIter.next() assert(!otherTuplesIter.hasNext) // Preallocate the new Values array - val newValues = new Array[Seq[(Seq[V],Seq[W])]](newIndex.size) + val newValues = new Array[(Seq[V], Seq[W])](newIndex.size) + val newBS = new BitSet(newIndex.size) + // Lookup the sequences in both submaps for ((k,ind) <- newIndex) { - val thisSeq = if (thisIndex.contains(k)) thisValues(thisIndex.get(k)) else null - val otherSeq = if (otherIndex.contains(k)) otherValues(otherIndex.get(k)) else null - // if either of the sequences is not null then the key was in one of the two tables - // and so the value should appear in the returned table - newValues(ind) = (thisSeq, otherSeq) match { - case (null, null) => null - case (a, null) => Seq( (a, Seq.empty[W]) ) - case (null, b) => Seq( (Seq.empty[V], b) ) - case (a, b) => Seq( (a,b) ) + // Get the left key + val a = if (thisIndex.contains(k)) { + val ind = thisIndex.get(k) + if(thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V] + } else Seq.empty[V] + // Get the right key + val b = if (otherIndex.contains(k)) { + val ind = otherIndex.get(k) + if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W] + } else Seq.empty[W] + // If at least one key was present then we generate a tuple. + if (!a.isEmpty || !b.isEmpty) { + newValues(ind) = (a, b) + newBS(ind) = true } } - List(newValues.toSeq).iterator + List((newValues, newBS)).iterator }) new IndexedRDD(new RDDIndex(newIndex), newValues) } @@ -238,44 +247,48 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K self.tuples.zipPartitions(otherShuffled)( (thisTuplesIter, otherTuplesIter) => { // Get the corresponding indicies and values for this IndexedRDD - val (thisIndex, thisValues) = thisTuplesIter.next() + val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next() assert(!thisTuplesIter.hasNext()) // Construct a new index val newIndex = thisIndex.clone().asInstanceOf[BlockIndex[K]] // Construct a new array Buffer to store the values - val newValues = ArrayBuffer.fill[(Seq[V], Seq[W])](thisValues.size)(null) + val newValues = ArrayBuffer.fill[ (Seq[V], Seq[W]) ](thisValues.size)(null) + val newBS = new BitSet(thisValues.size) // populate the newValues with the values in this IndexedRDD for ((k,i) <- thisIndex) { - if (thisValues(i) != null) { - newValues(i) = (thisValues(i), ArrayBuffer.empty[W]) + if (thisBS(i)) { + newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W]) + newBS(i) = true } } // Now iterate through the other tuples updating the map for ((k,w) <- otherTuplesIter){ - if (!newIndex.contains(k)) { + if (newIndex.contains(k)) { + val ind = newIndex.get(k) + if(newBS(ind)) { + newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w) + } else { + // If the other key was in the index but not in the values + // of this indexed RDD then create a new values entry for it + newBS(ind) = true + newValues(ind) = (Seq.empty[V], ArrayBuffer(w)) + } + } else { // update the index val ind = newIndex.size newIndex.put(k, ind) + newBS(ind) = true // Update the values - newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) - } else { - val ind = newIndex.get(k) - if(newValues(ind) == null) { - // If the other key was in the index but not in the values - // of this indexed RDD then create a new values entry for it - newValues(ind) = (Seq.empty[V], ArrayBuffer(w)) - } else { - newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w) - } + newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) } } - // Finalize the new values array - val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] = - newValues.view.map{ - case null => null - case (s, ab) => Seq((s, ab.toSeq)) - }.toSeq - List( (newIndex, newValuesArray) ).iterator + // // Finalize the new values array + // val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] = + // newValues.view.map{ + // case null => null + // case (s, ab) => Seq((s, ab.toSeq)) + // }.toSeq + List( (newIndex, (newValues.toArray, newBS)) ).iterator }).cache() // Extract the index and values from the above RDD diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 5d00917dab..8a66297f6f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -710,16 +710,19 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) def values: RDD[V] = self.map(_._2) + + def indexed(): IndexedRDD[K,V] = IndexedRDD(self) + def indexed(numPartitions: Int): IndexedRDD[K,V] = IndexedRDD(self.partitionBy(new HashPartitioner(numPartitions))) def indexed(partitioner: Partitioner): IndexedRDD[K,V] = IndexedRDD(self.partitionBy(partitioner)) - - def indexed(existingIndex: RDDIndex[K] = null): IndexedRDD[K,V] = + def indexed(existingIndex: RDDIndex[K]): IndexedRDD[K,V] = IndexedRDD(self, existingIndex) + private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala index 3d6724ba0f..dbfccde8b9 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala @@ -7,7 +7,6 @@ import org.apache.spark.graph._ /** * A partition of edges in 3 large columnar arrays. */ -private[graph] class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest]( val srcIds: Array[Vid], val dstIds: Array[Vid], diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala index f2d07d55c6..cc3a443fa2 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala @@ -4,7 +4,7 @@ import scala.collection.mutable.ArrayBuilder import org.apache.spark.graph._ -private[graph] +//private[graph] class EdgePartitionBuilder[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest]{ val srcIds = new VertexArrayList @@ -20,7 +20,7 @@ ED: ClassManifest]{ } def toEdgePartition: EdgePartition[ED] = { - new EdgePartition(srcIds.toLongArray(), dstIds.toLongArray(), dataBuilder.result()) + new EdgePartition(srcIds.toLongArray(), dstIds.toLongArray(), dataBuilder.result()) } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 73538862b1..b6758b0501 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -3,6 +3,7 @@ package org.apache.spark.graph.impl import scala.collection.JavaConversions._ import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkContext._ import org.apache.spark.Partitioner @@ -72,7 +73,8 @@ object EdgeTripletBuilder { new EdgeTripletIterator(vmap, edgePartition) } ClosureCleaner.clean(iterFun) - vTableReplicated.join(eTable).mapPartitions( iterFun ) // end of map partition + vTableReplicated.zipJoinRDD(eTable) + .mapPartitions( iterFun ) // end of map partition } } @@ -83,7 +85,7 @@ object EdgeTripletBuilder { */ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( @transient val vTable: IndexedRDD[Vid, VD], - @transient val vid2pid: IndexedRDD[Vid, Pid], + @transient val vid2pid: IndexedRDD[Vid, Array[Pid]], @transient val eTable: IndexedRDD[Pid, EdgePartition[ED]]) extends Graph[VD, ED] { @@ -144,7 +146,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val numVertices = this.numVertices val numEdges = this.numEdges val replicationRatio = - vid2pid.groupByKey().map(kv => kv._2.size).sum / vTable.count + vid2pid.map(kv => kv._2.size).sum / vTable.count val loadArray = eTable.map{ case (pid, epart) => epart.data.size }.collect.map(x => x.toDouble / numEdges) val minLoad = loadArray.min @@ -342,7 +344,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( ClosureCleaner.clean(reduceFunc) // Map and preaggregate - val preAgg = vTableReplicated.join(eTable).flatMap{ + val preAgg = vTableReplicated.zipJoinRDD(eTable).flatMap{ case (pid, (vmap, edgePartition)) => val aggMap = new VertexHashMap[A] val et = new EdgeTriplet[VD, ED] @@ -363,19 +365,15 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( }.partitionBy(vTable.index.rdd.partitioner.get) // do the final reduction reusing the index map - IndexedRDD.reduceByKey(preAgg, reduceFunc, vTable.index) + IndexedRDD(preAgg, vTable.index, reduceFunc) } override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest] (updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2) : Graph[VD2, ED] = { - ClosureCleaner.clean(updateF) - - val newVTable = vTable.leftOuterJoin(updates).mapValuesWithKeys{ - case (vid, (data, other)) => updateF(vid, data, other) - }.asInstanceOf[IndexedRDD[Vid,VD2]] + val newVTable = vTable.zipJoinLeftWithKeys(updates)(updateF) new GraphImpl(newVTable, vid2pid, eTable) } @@ -451,34 +449,38 @@ object GraphImpl { val data = message.data builder.add(data._1, data._2, data._3) } - Iterator((pid, builder.toEdgePartition)) + val edgePartition = builder.toEdgePartition + Iterator((pid, edgePartition)) }, preservesPartitioning = true).indexed() } protected def createVid2Pid[ED: ClassManifest]( eTable: IndexedRDD[Pid, EdgePartition[ED]], - vTableIndex: RDDIndex[Vid]): IndexedRDD[Vid, Pid] = { - eTable.mapPartitions { iter => + vTableIndex: RDDIndex[Vid]): IndexedRDD[Vid, Array[Pid]] = { + val preAgg = eTable.mapPartitions { iter => val (pid, edgePartition) = iter.next() val vSet = new VertexSet edgePartition.foreach(e => {vSet.add(e.srcId); vSet.add(e.dstId)}) vSet.iterator.map { vid => (vid.toLong, pid) } - }.indexed(vTableIndex) + } + IndexedRDD[Vid, Pid, ArrayBuffer[Pid]](preAgg, vTableIndex, + (p: Pid) => ArrayBuffer(p), + (ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab}, + (a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b) + .mapValues(a => a.toArray).asInstanceOf[IndexedRDD[Vid, Array[Pid]]] } protected def createVTableReplicated[VD: ClassManifest, ED: ClassManifest]( - vTable: IndexedRDD[Vid, VD], vid2pid: IndexedRDD[Vid, Pid], + vTable: IndexedRDD[Vid, VD], vid2pid: IndexedRDD[Vid, Array[Pid]], eTable: IndexedRDD[Pid, EdgePartition[ED]]): IndexedRDD[Pid, VertexHashMap[VD]] = { // Join vid2pid and vTable, generate a shuffle dependency on the joined // result, and get the shuffle id so we can use it on the slave. - vTable.cogroup(vid2pid) - .flatMap { case (vid, (vdatas, pids)) => - pids.iterator.map { - pid => MessageToPartition(pid, (vid, vdatas.head)) - } + vTable.zipJoinRDD(vid2pid) + .flatMap { case (vid, (vdata, pids)) => + pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) } } .partitionBy(eTable.partitioner.get) //@todo assert edge table has partitioner .mapPartitionsWithIndex( (pid, iter) => {