Introducing unique indexedrdd and adding numerous specialized joins
This commit is contained in:
commit
57ac9073ae
|
@ -25,6 +25,9 @@ import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet}
|
|||
import scala.collection.JavaConversions._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import scala.collection.mutable.BitSet
|
||||
|
||||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.rdd._
|
||||
import org.apache.spark.SparkContext._
|
||||
|
@ -69,7 +72,7 @@ class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockI
|
|||
*/
|
||||
class IndexedRDD[K: ClassManifest, V: ClassManifest](
|
||||
@transient val index: RDDIndex[K],
|
||||
@transient val valuesRDD: RDD[ Seq[Seq[V]] ])
|
||||
@transient val valuesRDD: RDD[ (Array[V], BitSet) ])
|
||||
extends RDD[(K, V)](index.rdd.context,
|
||||
List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) {
|
||||
|
||||
|
@ -110,6 +113,195 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
|
|||
}
|
||||
|
||||
|
||||
|
||||
|
||||
def zipJoinRDD[W: ClassManifest](other: IndexedRDD[K,W]): RDD[(K,(V,W))] = {
|
||||
assert(index == other.index)
|
||||
index.rdd.zipPartitions(valuesRDD, other.valuesRDD){
|
||||
(thisIndexIter, thisIter, otherIter) =>
|
||||
val index = thisIndexIter.next()
|
||||
assert(!thisIndexIter.hasNext)
|
||||
val (thisValues, thisBS) = thisIter.next()
|
||||
assert(!thisIter.hasNext)
|
||||
val (otherValues, otherBS) = otherIter.next()
|
||||
assert(!otherIter.hasNext)
|
||||
val newBS = thisBS & otherBS
|
||||
|
||||
index.iterator.flatMap{ case (k,i) =>
|
||||
if(newBS(i)) List((k, (thisValues(i), otherValues(i))))
|
||||
else List.empty
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def zipJoin[W: ClassManifest](other: IndexedRDD[K,W]): IndexedRDD[K,(V,W)] = {
|
||||
assert(index == other.index)
|
||||
val newValuesRDD = valuesRDD.zipPartitions(other.valuesRDD){
|
||||
(thisIter, otherIter) =>
|
||||
val (thisValues, thisBS) = thisIter.next()
|
||||
assert(!thisIter.hasNext)
|
||||
val (otherValues, otherBS) = otherIter.next()
|
||||
assert(!otherIter.hasNext)
|
||||
val newBS = thisBS & otherBS
|
||||
val newValues = new Array[(V,W)](thisValues.size)
|
||||
for( i <- newBS ) {
|
||||
newValues(i) = (thisValues(i), otherValues(i))
|
||||
}
|
||||
List((newValues, newBS)).iterator
|
||||
}
|
||||
new IndexedRDD(index, newValuesRDD)
|
||||
}
|
||||
|
||||
|
||||
def zipJoinWithKeys[W: ClassManifest, Z: ClassManifest](
|
||||
other: RDD[(K,W)])(
|
||||
f: (K, V, W) => Z,
|
||||
merge: (Z,Z) => Z = (a:Z, b:Z) => a):
|
||||
IndexedRDD[K,Z] = {
|
||||
val cleanF = index.rdd.context.clean(f)
|
||||
val cleanMerge = index.rdd.context.clean(merge)
|
||||
other match {
|
||||
case other: IndexedRDD[_, _] if index == other.index => {
|
||||
val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){
|
||||
(thisIndexIter, thisIter, otherIter) =>
|
||||
val index = thisIndexIter.next()
|
||||
assert(!thisIndexIter.hasNext)
|
||||
val (thisValues, thisBS) = thisIter.next()
|
||||
assert(!thisIter.hasNext)
|
||||
val (otherValues, otherBS) = otherIter.next()
|
||||
assert(!otherIter.hasNext)
|
||||
val newValues = new Array[Z](thisValues.size)
|
||||
val newBS = thisBS & otherBS
|
||||
for( (k,i) <- index ) {
|
||||
if (newBS(i)) {
|
||||
newValues(i) = cleanF(k, thisValues(i), otherValues(i))
|
||||
}
|
||||
}
|
||||
List((newValues, newBS)).iterator
|
||||
}
|
||||
new IndexedRDD(index, newValues)
|
||||
}
|
||||
|
||||
case _ => {
|
||||
// Get the partitioner from the index
|
||||
val partitioner = index.rdd.partitioner match {
|
||||
case Some(p) => p
|
||||
case None => throw new SparkException("An index must have a partitioner.")
|
||||
}
|
||||
// Shuffle the other RDD using the partitioner for this index
|
||||
val otherShuffled =
|
||||
if (other.partitioner == Some(partitioner)) other
|
||||
else other.partitionBy(partitioner)
|
||||
|
||||
val newValues = index.rdd.zipPartitions(valuesRDD, other) {
|
||||
(thisIndexIter, thisIter, tuplesIter) =>
|
||||
val index = thisIndexIter.next()
|
||||
assert(!thisIndexIter.hasNext)
|
||||
val (thisValues, thisBS) = thisIter.next()
|
||||
assert(!thisIter.hasNext)
|
||||
|
||||
val newValues = new Array[Z](thisValues.size)
|
||||
// track which values are matched with values in other
|
||||
val tempBS = new BitSet(thisValues.size)
|
||||
|
||||
for( (k, w) <- tuplesIter if index.contains(k) ) {
|
||||
val ind = index.get(k)
|
||||
if(thisBS(ind)) {
|
||||
val result = cleanF(k, thisValues(ind), w)
|
||||
if(tempBS(ind)) {
|
||||
newValues(ind) = cleanMerge(newValues(ind), result)
|
||||
} else {
|
||||
newValues(ind) = result
|
||||
tempBS(ind) = true
|
||||
}
|
||||
}
|
||||
}
|
||||
List((newValues, tempBS)).iterator
|
||||
} // end of newValues
|
||||
new IndexedRDD(index, newValues)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
def zipJoinLeftWithKeys[W: ClassManifest, Z: ClassManifest](
|
||||
other: RDD[(K,W)])(
|
||||
f: (K, V, Option[W]) => Z,
|
||||
merge: (Z,Z) => Z = (a:Z, b:Z) => a):
|
||||
IndexedRDD[K,Z] = {
|
||||
val cleanF = index.rdd.context.clean(f)
|
||||
val cleanMerge = index.rdd.context.clean(merge)
|
||||
other match {
|
||||
case other: IndexedRDD[_, _] if index == other.index => {
|
||||
val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){
|
||||
(thisIndexIter, thisIter, otherIter) =>
|
||||
val index = thisIndexIter.next()
|
||||
assert(!thisIndexIter.hasNext)
|
||||
val (thisValues, thisBS) = thisIter.next()
|
||||
assert(!thisIter.hasNext)
|
||||
val (otherValues, otherBS) = otherIter.next()
|
||||
assert(!otherIter.hasNext)
|
||||
val newValues = new Array[Z](thisValues.size)
|
||||
for( (k,i) <- index ) {
|
||||
if (thisBS(i)) {
|
||||
val otherVal = if(otherBS(i)) Some(otherValues(i)) else None
|
||||
newValues(i) = cleanF(k, thisValues(i), otherVal)
|
||||
}
|
||||
}
|
||||
List((newValues, thisBS)).iterator
|
||||
}
|
||||
new IndexedRDD(index, newValues)
|
||||
}
|
||||
|
||||
case _ => {
|
||||
// Get the partitioner from the index
|
||||
val partitioner = index.rdd.partitioner match {
|
||||
case Some(p) => p
|
||||
case None => throw new SparkException("An index must have a partitioner.")
|
||||
}
|
||||
// Shuffle the other RDD using the partitioner for this index
|
||||
val otherShuffled =
|
||||
if (other.partitioner == Some(partitioner)) other
|
||||
else other.partitionBy(partitioner)
|
||||
val newValues = index.rdd.zipPartitions(valuesRDD, other) {
|
||||
(thisIndexIter, thisIter, tuplesIter) =>
|
||||
val index = thisIndexIter.next()
|
||||
assert(!thisIndexIter.hasNext)
|
||||
val (thisValues, thisBS) = thisIter.next()
|
||||
assert(!thisIter.hasNext)
|
||||
|
||||
val newValues = new Array[Z](thisValues.size)
|
||||
// track which values are matched with values in other
|
||||
val tempBS = new BitSet(thisValues.size)
|
||||
|
||||
for( (k, w) <- tuplesIter if index.contains(k) ) {
|
||||
val ind = index.get(k)
|
||||
if(thisBS(ind)) {
|
||||
val result = cleanF(k, thisValues(ind), Option(w))
|
||||
if(tempBS(ind)) {
|
||||
newValues(ind) = cleanMerge(newValues(ind), result)
|
||||
} else {
|
||||
newValues(ind) = result
|
||||
tempBS(ind) = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process remaining keys in lef join
|
||||
for( (k,ind) <- index if thisBS(ind) && !tempBS(ind)) {
|
||||
newValues(ind) = cleanF(k, thisValues(ind), None)
|
||||
}
|
||||
List((newValues, thisBS)).iterator
|
||||
} // end of newValues
|
||||
new IndexedRDD(index, newValues)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* The IndexedRDD has its own optimized version of the pairRDDFunctions.
|
||||
*/
|
||||
|
@ -120,19 +312,41 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
|
|||
}
|
||||
|
||||
|
||||
override def filter(f: Tuple2[K,V] => Boolean): RDD[(K,V)] = {
|
||||
val cleanF = index.rdd.context.clean(f)
|
||||
val newValues = index.rdd.zipPartitions(valuesRDD){
|
||||
(keysIter, valuesIter) =>
|
||||
val index = keysIter.next()
|
||||
assert(keysIter.hasNext() == false)
|
||||
val (oldValues, bs) = valuesIter.next()
|
||||
assert(valuesIter.hasNext() == false)
|
||||
// Allocate the array to store the results into
|
||||
val newValues: Array[V] = oldValues.clone().asInstanceOf[Array[V]]
|
||||
val newBS = new BitSet(oldValues.size)
|
||||
// Populate the new Values
|
||||
for( (k,i) <- index ) {
|
||||
if ( bs(i) && f( (k, oldValues(i)) ) ) { newBS(i) = true }
|
||||
else { newValues(i) = null.asInstanceOf[V] }
|
||||
}
|
||||
Array((newValues, newBS)).iterator
|
||||
}
|
||||
new IndexedRDD[K,V](index, newValues)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Provide the RDD[(K,V)] equivalent output.
|
||||
*/
|
||||
override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
|
||||
tuples.compute(part, context).flatMap { case (indexMap, values) =>
|
||||
tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) =>
|
||||
// Walk the index to construct the key, value pairs
|
||||
indexMap.iterator
|
||||
indexMap.iterator
|
||||
// Extract rows with key value pairs and indicators
|
||||
.map{ case (k, ind) => (k, values(ind)) }
|
||||
.map{ case (k, ind) => (bs(ind), k, ind) }
|
||||
// Remove tuples that aren't actually present in the array
|
||||
.filter{ case (_, valar) => valar != null && !valar.isEmpty()}
|
||||
.filter( _._1 )
|
||||
// Extract the pair (removing the indicator from the tuple)
|
||||
.flatMap{ case (k, valar) => valar.map(v => (k,v))}
|
||||
.map( x => (x._2, values(x._3) ) )
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -143,82 +357,100 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
|
|||
|
||||
object IndexedRDD {
|
||||
|
||||
/**
|
||||
* Construct an IndexedRDD from a regular RDD[(K,V)] using an existing index
|
||||
* if one is provided.
|
||||
*/
|
||||
|
||||
def apply[K: ClassManifest, V: ClassManifest](rdd: RDD[(K,V)]): IndexedRDD[K,V] =
|
||||
apply(rdd, (a:V, b:V) => a )
|
||||
|
||||
def apply[K: ClassManifest, V: ClassManifest](
|
||||
tbl: RDD[(K,V)],
|
||||
existingIndex: RDDIndex[K] = null ): IndexedRDD[K, V] = {
|
||||
rdd: RDD[(K,V)], reduceFunc: (V, V) => V): IndexedRDD[K,V] = {
|
||||
// Preaggregate and shuffle if necessary
|
||||
// Preaggregation.
|
||||
val aggregator = new Aggregator[K, V, V](v => v, reduceFunc, reduceFunc)
|
||||
val partitioner = new HashPartitioner(rdd.partitions.size)
|
||||
val preAgg = rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner)
|
||||
|
||||
if (existingIndex == null) { // If no index was provided
|
||||
// Shuffle the table (if necessary)
|
||||
val shuffledTbl =
|
||||
if (tbl.partitioner.isEmpty) {
|
||||
new ShuffledRDD[K, V, (K,V)](tbl, Partitioner.defaultPartitioner(tbl))
|
||||
} else { tbl }
|
||||
|
||||
val groups = shuffledTbl.mapPartitions( iter => {
|
||||
val indexMap = new BlockIndex[K]()
|
||||
val values = new ArrayBuffer[Seq[V]]()
|
||||
for ((k,v) <- iter){
|
||||
if(!indexMap.contains(k)) {
|
||||
val ind = indexMap.size
|
||||
indexMap.put(k, ind)
|
||||
values.append(ArrayBuffer.empty[V])
|
||||
}
|
||||
val ind = indexMap.get(k)
|
||||
values(ind).asInstanceOf[ArrayBuffer[V]].append(v)
|
||||
}
|
||||
List((indexMap, values.toSeq)).iterator
|
||||
}, true).cache
|
||||
// extract the index and the values
|
||||
val index = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true)
|
||||
val values = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
|
||||
new IndexedRDD[K,V](new RDDIndex(index), values)
|
||||
} else {
|
||||
val index = existingIndex
|
||||
val partitioner = index.rdd.partitioner match {
|
||||
case Some(p) => p
|
||||
case None => throw new SparkException("An index must have a partitioner.")
|
||||
}
|
||||
|
||||
// Shuffle the table according to the index (if necessary)
|
||||
val shuffledTbl =
|
||||
if (tbl.partitioner == Some(partitioner)) {
|
||||
tbl
|
||||
val groups = preAgg.mapPartitions( iter => {
|
||||
val indexMap = new BlockIndex[K]()
|
||||
val values = new ArrayBuffer[V]()
|
||||
val bs = new BitSet
|
||||
for ((k,v) <- iter) {
|
||||
if(!indexMap.contains(k)) {
|
||||
val ind = indexMap.size
|
||||
indexMap.put(k, ind)
|
||||
values.append(v)
|
||||
bs(ind) = true
|
||||
} else {
|
||||
new ShuffledRDD[K, V, (K,V)](tbl, partitioner)
|
||||
val ind = indexMap.get(k)
|
||||
values(ind) = reduceFunc(values(ind), v)
|
||||
}
|
||||
|
||||
// Use the index to build the new values table
|
||||
val values = index.rdd.zipPartitions(shuffledTbl)(
|
||||
(indexIter, tblIter) => {
|
||||
// There is only one map
|
||||
val index = indexIter.next()
|
||||
assert(!indexIter.hasNext())
|
||||
val values = new Array[Seq[V]](index.size)
|
||||
for ((k,v) <- tblIter) {
|
||||
if (!index.contains(k)) {
|
||||
throw new SparkException("Error: Trying to bind an external index " +
|
||||
"to an RDD which contains keys that are not in the index.")
|
||||
}
|
||||
val ind = index(k)
|
||||
if (values(ind) == null) {
|
||||
values(ind) = ArrayBuffer.empty[V]
|
||||
}
|
||||
values(ind).asInstanceOf[ArrayBuffer[V]].append(v)
|
||||
}
|
||||
List(values.toSeq).iterator
|
||||
})
|
||||
|
||||
new IndexedRDD[K,V](index, values)
|
||||
}
|
||||
}
|
||||
List( (indexMap, (values.toArray, bs)) ).iterator
|
||||
}, true).cache
|
||||
// extract the index and the values
|
||||
val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true)
|
||||
val values = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
|
||||
new IndexedRDD[K,V](new RDDIndex(index), values)
|
||||
}
|
||||
|
||||
|
||||
def reduceByKey[K: ClassManifest, V: ClassManifest](
|
||||
rdd: RDD[(K,V)], reduceFun: (V, V) => V, index: RDDIndex[K]): IndexedRDD[K,V] = {
|
||||
|
||||
def apply[K: ClassManifest, V: ClassManifest](
|
||||
rdd: RDD[(K,V)], index: RDDIndex[K]): IndexedRDD[K,V] =
|
||||
apply(rdd, index, (a:V,b:V) => a)
|
||||
|
||||
|
||||
def apply[K: ClassManifest, V: ClassManifest](
|
||||
rdd: RDD[(K,V)], index: RDDIndex[K],
|
||||
reduceFunc: (V, V) => V): IndexedRDD[K,V] =
|
||||
apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc)
|
||||
// {
|
||||
// // Get the index Partitioner
|
||||
// val partitioner = index.rdd.partitioner match {
|
||||
// case Some(p) => p
|
||||
// case None => throw new SparkException("An index must have a partitioner.")
|
||||
// }
|
||||
// // Preaggregate and shuffle if necessary
|
||||
// val partitioned =
|
||||
// if (rdd.partitioner != Some(partitioner)) {
|
||||
// // Preaggregation.
|
||||
// val aggregator = new Aggregator[K, V, V](v => v, reduceFunc, reduceFunc)
|
||||
// rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner)
|
||||
// } else {
|
||||
// rdd
|
||||
// }
|
||||
|
||||
// // Use the index to build the new values table
|
||||
// val values = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
|
||||
// // There is only one map
|
||||
// val index = indexIter.next()
|
||||
// assert(!indexIter.hasNext())
|
||||
// val values = new Array[V](index.size)
|
||||
// val bs = new BitSet(index.size)
|
||||
// for ((k,v) <- tblIter) {
|
||||
// if (!index.contains(k)) {
|
||||
// throw new SparkException("Error: Trying to bind an external index " +
|
||||
// "to an RDD which contains keys that are not in the index.")
|
||||
// }
|
||||
// val ind = index(k)
|
||||
// if (bs(ind)) {
|
||||
// values(ind) = reduceFunc(values(ind), v)
|
||||
// } else {
|
||||
// values(ind) = v
|
||||
// bs(ind) = true
|
||||
// }
|
||||
// }
|
||||
// List((values, bs)).iterator
|
||||
// })
|
||||
// new IndexedRDD[K,V](index, values)
|
||||
// } // end of apply
|
||||
|
||||
|
||||
def apply[K: ClassManifest, V: ClassManifest, C: ClassManifest](
|
||||
rdd: RDD[(K,V)],
|
||||
index: RDDIndex[K],
|
||||
createCombiner: V => C,
|
||||
mergeValue: (C, V) => C,
|
||||
mergeCombiners: (C, C) => C): IndexedRDD[K,C] = {
|
||||
// Get the index Partitioner
|
||||
val partitioner = index.rdd.partitioner match {
|
||||
case Some(p) => p
|
||||
|
@ -228,11 +460,11 @@ object IndexedRDD {
|
|||
val partitioned =
|
||||
if (rdd.partitioner != Some(partitioner)) {
|
||||
// Preaggregation.
|
||||
val aggregator = new Aggregator[K, V, V](v => v, reduceFun, reduceFun)
|
||||
val combined = rdd.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
|
||||
combined.partitionBy(partitioner) //new ShuffledRDD[K, V, (K, V)](combined, partitioner)
|
||||
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue,
|
||||
mergeCombiners)
|
||||
rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner)
|
||||
} else {
|
||||
rdd
|
||||
rdd.mapValues(x => createCombiner(x))
|
||||
}
|
||||
|
||||
// Use the index to build the new values table
|
||||
|
@ -240,25 +472,26 @@ object IndexedRDD {
|
|||
// There is only one map
|
||||
val index = indexIter.next()
|
||||
assert(!indexIter.hasNext())
|
||||
val values = new Array[Array[V]](index.size)
|
||||
for ((k,v) <- tblIter) {
|
||||
val values = new Array[C](index.size)
|
||||
val bs = new BitSet(index.size)
|
||||
for ((k,c) <- tblIter) {
|
||||
if (!index.contains(k)) {
|
||||
throw new SparkException("Error: Trying to bind an external index " +
|
||||
"to an RDD which contains keys that are not in the index.")
|
||||
}
|
||||
val ind = index(k)
|
||||
if (values(ind) == null) {
|
||||
values(ind) = Array(v)
|
||||
if (bs(ind)) {
|
||||
values(ind) = mergeCombiners(values(ind), c)
|
||||
} else {
|
||||
values(ind)(0) = reduceFun(values(ind).head, v)
|
||||
values(ind) = c
|
||||
bs(ind) = true
|
||||
}
|
||||
}
|
||||
List(values.view.map(x => if (x != null) x.toSeq else null ).toSeq).iterator
|
||||
List((values, bs)).iterator
|
||||
})
|
||||
new IndexedRDD(index, values)
|
||||
} // end of apply
|
||||
|
||||
new IndexedRDD[K,V](index, values)
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct and index of the unique values in a given RDD.
|
||||
|
@ -278,9 +511,7 @@ object IndexedRDD {
|
|||
}
|
||||
case Some(partitioner) =>
|
||||
tbl.partitionBy(partitioner)
|
||||
// new ShuffledRDD[K, Boolean](tbl, partitioner)
|
||||
}
|
||||
|
||||
|
||||
val index = shuffledTbl.mapPartitions( iter => {
|
||||
val indexMap = new BlockIndex[K]()
|
||||
|
|
|
@ -22,6 +22,8 @@ import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet}
|
|||
import scala.collection.JavaConversions._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import scala.collection.mutable.BitSet
|
||||
|
||||
import org.apache.spark._
|
||||
|
||||
|
||||
|
@ -41,11 +43,15 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
|
|||
*/
|
||||
override def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = {
|
||||
val cleanF = self.index.rdd.context.clean(f)
|
||||
val newValues = self.valuesRDD.mapPartitions(_.map(values => values.map{
|
||||
case null => null
|
||||
case row => row.map(x => f(x))
|
||||
}), true)
|
||||
new IndexedRDD[K,U](self.index, newValues)
|
||||
val newValuesRDD = self.valuesRDD.mapPartitions(iter => iter.map{
|
||||
case (values, bs) =>
|
||||
val newValues = new Array[U](values.size)
|
||||
for ( ind <- bs ) {
|
||||
newValues(ind) = f(values(ind))
|
||||
}
|
||||
(newValues, bs)
|
||||
}, preservesPartitioning = true)
|
||||
new IndexedRDD[K,U](self.index, newValuesRDD)
|
||||
}
|
||||
|
||||
|
||||
|
@ -55,20 +61,19 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
|
|||
*/
|
||||
override def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): RDD[(K, U)] = {
|
||||
val cleanF = self.index.rdd.context.clean(f)
|
||||
val newValues = self.index.rdd.zipPartitions(self.valuesRDD){ (keysIter, valuesIter) =>
|
||||
val newValues = self.index.rdd.zipPartitions(self.valuesRDD){
|
||||
(keysIter, valuesIter) =>
|
||||
val index = keysIter.next()
|
||||
assert(keysIter.hasNext() == false)
|
||||
val oldValues = valuesIter.next()
|
||||
val (oldValues, bs) = valuesIter.next()
|
||||
assert(valuesIter.hasNext() == false)
|
||||
// Allocate the array to store the results into
|
||||
val newValues: Array[Seq[U]] = new Array[Seq[U]](oldValues.size)
|
||||
// Allocate the array to store the results into
|
||||
val newValues: Array[U] = new Array[U](oldValues.size)
|
||||
// Populate the new Values
|
||||
for( (k,i) <- index ) {
|
||||
if(oldValues(i) != null) {
|
||||
newValues(i) = oldValues(i).map( v => f(k,v) )
|
||||
}
|
||||
if (bs(i)) { newValues(i) = f(k, oldValues(i)) }
|
||||
}
|
||||
Array(newValues.toSeq).iterator
|
||||
Array((newValues, bs)).iterator
|
||||
}
|
||||
new IndexedRDD[K,U](self.index, newValues)
|
||||
}
|
||||
|
@ -81,11 +86,20 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
|
|||
*/
|
||||
override def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K,U)] = {
|
||||
val cleanF = self.index.rdd.context.clean(f)
|
||||
val newValues = self.valuesRDD.mapPartitions(_.map(values => values.map{
|
||||
case null => null
|
||||
case row => row.flatMap(x => f(x))
|
||||
}), true)
|
||||
new IndexedRDD[K,U](self.index, newValues)
|
||||
val newValuesRDD = self.valuesRDD.mapPartitions(iter => iter.map{
|
||||
case (values, bs) =>
|
||||
val newValues = new Array[U](values.size)
|
||||
val newBS = new BitSet(values.size)
|
||||
for ( ind <- bs ) {
|
||||
val res = f(values(ind))
|
||||
if(!res.isEmpty) {
|
||||
newValues(ind) = res.toIterator.next()
|
||||
newBS(ind) = true
|
||||
}
|
||||
}
|
||||
(newValues, newBS)
|
||||
}, preservesPartitioning = true)
|
||||
new IndexedRDD[K,U](self.index, newValuesRDD)
|
||||
}
|
||||
|
||||
|
||||
|
@ -105,31 +119,18 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
|
|||
partitioner: Partitioner,
|
||||
mapSideCombine: Boolean = true,
|
||||
serializerClass: String = null): RDD[(K, C)] = {
|
||||
val newValues = self.valuesRDD.mapPartitions(
|
||||
_.map{ groups: Seq[Seq[V]] =>
|
||||
groups.map{ group: Seq[V] =>
|
||||
if (group != null && !group.isEmpty) {
|
||||
val c: C = createCombiner(group.head)
|
||||
val sum: C = group.tail.foldLeft(c)(mergeValue)
|
||||
Seq(sum)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
}, true)
|
||||
new IndexedRDD[K,C](self.index, newValues)
|
||||
mapValues(createCombiner)
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
|
||||
* resulting RDD with the existing partitioner/parallelism level.
|
||||
*/
|
||||
override def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
|
||||
val newValues = self.valuesRDD.mapPartitions(_.map{ar => ar.map{s => Seq(s)} }, true)
|
||||
new IndexedRDD[K, Seq[V]](self.index, newValues)
|
||||
}
|
||||
// /**
|
||||
// * Group the values for each key in the RDD into a single sequence. Hash-partitions the
|
||||
// * resulting RDD with the existing partitioner/parallelism level.
|
||||
// */
|
||||
// override def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
|
||||
// val newValues = self.valuesRDD.mapPartitions(_.map{ar => ar.map{s => Seq(s)} }, true)
|
||||
// new IndexedRDD[K, Seq[V]](self.index, newValues)
|
||||
// }
|
||||
|
||||
|
||||
/**
|
||||
|
@ -146,23 +147,24 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
|
|||
// However it is possible that both RDDs are missing a value for a given key in
|
||||
// which case the returned RDD should have a null value
|
||||
val newValues =
|
||||
self.valuesRDD.zipPartitions(other.valuesRDD)(
|
||||
(thisIter, otherIter) => {
|
||||
val thisValues: Seq[Seq[V]] = thisIter.next()
|
||||
self.valuesRDD.zipPartitions(other.valuesRDD){
|
||||
(thisIter, otherIter) =>
|
||||
val (thisValues, thisBS) = thisIter.next()
|
||||
assert(!thisIter.hasNext)
|
||||
val otherValues: Seq[Seq[W]] = otherIter.next()
|
||||
assert(!otherIter.hasNext)
|
||||
// Zip the values and if both arrays are null then the key is not present and
|
||||
// so the resulting value must be null (not a tuple of empty sequences)
|
||||
val tmp: Seq[Seq[(Seq[V], Seq[W])]] = thisValues.view.zip(otherValues).map{
|
||||
case (null, null) => null // The key is not present in either RDD
|
||||
case (a, null) => Seq((a, Seq.empty[W]))
|
||||
case (null, b) => Seq((Seq.empty[V], b))
|
||||
case (a, b) => Seq((a,b))
|
||||
}.toSeq
|
||||
List(tmp).iterator
|
||||
})
|
||||
new IndexedRDD[K, (Seq[V], Seq[W])](self.index, newValues)
|
||||
val (otherValues, otherBS) = otherIter.next()
|
||||
assert(!otherIter.hasNext)
|
||||
|
||||
val newValues = new Array[(Seq[V], Seq[W])](thisValues.size)
|
||||
val newBS = thisBS | otherBS
|
||||
|
||||
for( ind <- newBS ) {
|
||||
val a = if (thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V]
|
||||
val b = if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W]
|
||||
newValues(ind) = (a, b)
|
||||
}
|
||||
List((newValues, newBS)).iterator
|
||||
}
|
||||
new IndexedRDD(self.index, newValues)
|
||||
}
|
||||
case other: IndexedRDD[_, _]
|
||||
if self.index.rdd.partitioner == other.index.rdd.partitioner => {
|
||||
|
@ -197,26 +199,33 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
|
|||
val newIndex = newIndexIter.next()
|
||||
assert(!newIndexIter.hasNext)
|
||||
// Get the corresponding indicies and values for this and the other IndexedRDD
|
||||
val (thisIndex, thisValues) = thisTuplesIter.next()
|
||||
val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next()
|
||||
assert(!thisTuplesIter.hasNext)
|
||||
val (otherIndex, otherValues) = otherTuplesIter.next()
|
||||
val (otherIndex, (otherValues, otherBS)) = otherTuplesIter.next()
|
||||
assert(!otherTuplesIter.hasNext)
|
||||
// Preallocate the new Values array
|
||||
val newValues = new Array[Seq[(Seq[V],Seq[W])]](newIndex.size)
|
||||
val newValues = new Array[(Seq[V], Seq[W])](newIndex.size)
|
||||
val newBS = new BitSet(newIndex.size)
|
||||
|
||||
// Lookup the sequences in both submaps
|
||||
for ((k,ind) <- newIndex) {
|
||||
val thisSeq = if (thisIndex.contains(k)) thisValues(thisIndex.get(k)) else null
|
||||
val otherSeq = if (otherIndex.contains(k)) otherValues(otherIndex.get(k)) else null
|
||||
// if either of the sequences is not null then the key was in one of the two tables
|
||||
// and so the value should appear in the returned table
|
||||
newValues(ind) = (thisSeq, otherSeq) match {
|
||||
case (null, null) => null
|
||||
case (a, null) => Seq( (a, Seq.empty[W]) )
|
||||
case (null, b) => Seq( (Seq.empty[V], b) )
|
||||
case (a, b) => Seq( (a,b) )
|
||||
// Get the left key
|
||||
val a = if (thisIndex.contains(k)) {
|
||||
val ind = thisIndex.get(k)
|
||||
if(thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V]
|
||||
} else Seq.empty[V]
|
||||
// Get the right key
|
||||
val b = if (otherIndex.contains(k)) {
|
||||
val ind = otherIndex.get(k)
|
||||
if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W]
|
||||
} else Seq.empty[W]
|
||||
// If at least one key was present then we generate a tuple.
|
||||
if (!a.isEmpty || !b.isEmpty) {
|
||||
newValues(ind) = (a, b)
|
||||
newBS(ind) = true
|
||||
}
|
||||
}
|
||||
List(newValues.toSeq).iterator
|
||||
List((newValues, newBS)).iterator
|
||||
})
|
||||
new IndexedRDD(new RDDIndex(newIndex), newValues)
|
||||
}
|
||||
|
@ -238,44 +247,48 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
|
|||
self.tuples.zipPartitions(otherShuffled)(
|
||||
(thisTuplesIter, otherTuplesIter) => {
|
||||
// Get the corresponding indicies and values for this IndexedRDD
|
||||
val (thisIndex, thisValues) = thisTuplesIter.next()
|
||||
val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next()
|
||||
assert(!thisTuplesIter.hasNext())
|
||||
// Construct a new index
|
||||
val newIndex = thisIndex.clone().asInstanceOf[BlockIndex[K]]
|
||||
// Construct a new array Buffer to store the values
|
||||
val newValues = ArrayBuffer.fill[(Seq[V], Seq[W])](thisValues.size)(null)
|
||||
val newValues = ArrayBuffer.fill[ (Seq[V], Seq[W]) ](thisValues.size)(null)
|
||||
val newBS = new BitSet(thisValues.size)
|
||||
// populate the newValues with the values in this IndexedRDD
|
||||
for ((k,i) <- thisIndex) {
|
||||
if (thisValues(i) != null) {
|
||||
newValues(i) = (thisValues(i), ArrayBuffer.empty[W])
|
||||
if (thisBS(i)) {
|
||||
newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W])
|
||||
newBS(i) = true
|
||||
}
|
||||
}
|
||||
// Now iterate through the other tuples updating the map
|
||||
for ((k,w) <- otherTuplesIter){
|
||||
if (!newIndex.contains(k)) {
|
||||
if (newIndex.contains(k)) {
|
||||
val ind = newIndex.get(k)
|
||||
if(newBS(ind)) {
|
||||
newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w)
|
||||
} else {
|
||||
// If the other key was in the index but not in the values
|
||||
// of this indexed RDD then create a new values entry for it
|
||||
newBS(ind) = true
|
||||
newValues(ind) = (Seq.empty[V], ArrayBuffer(w))
|
||||
}
|
||||
} else {
|
||||
// update the index
|
||||
val ind = newIndex.size
|
||||
newIndex.put(k, ind)
|
||||
newBS(ind) = true
|
||||
// Update the values
|
||||
newValues.append( (Seq.empty[V], ArrayBuffer(w) ) )
|
||||
} else {
|
||||
val ind = newIndex.get(k)
|
||||
if(newValues(ind) == null) {
|
||||
// If the other key was in the index but not in the values
|
||||
// of this indexed RDD then create a new values entry for it
|
||||
newValues(ind) = (Seq.empty[V], ArrayBuffer(w))
|
||||
} else {
|
||||
newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w)
|
||||
}
|
||||
newValues.append( (Seq.empty[V], ArrayBuffer(w) ) )
|
||||
}
|
||||
}
|
||||
// Finalize the new values array
|
||||
val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] =
|
||||
newValues.view.map{
|
||||
case null => null
|
||||
case (s, ab) => Seq((s, ab.toSeq))
|
||||
}.toSeq
|
||||
List( (newIndex, newValuesArray) ).iterator
|
||||
// // Finalize the new values array
|
||||
// val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] =
|
||||
// newValues.view.map{
|
||||
// case null => null
|
||||
// case (s, ab) => Seq((s, ab.toSeq))
|
||||
// }.toSeq
|
||||
List( (newIndex, (newValues.toArray, newBS)) ).iterator
|
||||
}).cache()
|
||||
|
||||
// Extract the index and values from the above RDD
|
||||
|
|
|
@ -710,16 +710,19 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
|
|||
def values: RDD[V] = self.map(_._2)
|
||||
|
||||
|
||||
|
||||
def indexed(): IndexedRDD[K,V] = IndexedRDD(self)
|
||||
|
||||
def indexed(numPartitions: Int): IndexedRDD[K,V] =
|
||||
IndexedRDD(self.partitionBy(new HashPartitioner(numPartitions)))
|
||||
|
||||
def indexed(partitioner: Partitioner): IndexedRDD[K,V] =
|
||||
IndexedRDD(self.partitionBy(partitioner))
|
||||
|
||||
|
||||
def indexed(existingIndex: RDDIndex[K] = null): IndexedRDD[K,V] =
|
||||
def indexed(existingIndex: RDDIndex[K]): IndexedRDD[K,V] =
|
||||
IndexedRDD(self, existingIndex)
|
||||
|
||||
|
||||
private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure
|
||||
|
||||
private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
|
||||
|
|
|
@ -7,7 +7,6 @@ import org.apache.spark.graph._
|
|||
/**
|
||||
* A partition of edges in 3 large columnar arrays.
|
||||
*/
|
||||
private[graph]
|
||||
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest](
|
||||
val srcIds: Array[Vid],
|
||||
val dstIds: Array[Vid],
|
||||
|
|
|
@ -4,7 +4,7 @@ import scala.collection.mutable.ArrayBuilder
|
|||
import org.apache.spark.graph._
|
||||
|
||||
|
||||
private[graph]
|
||||
//private[graph]
|
||||
class EdgePartitionBuilder[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
|
||||
ED: ClassManifest]{
|
||||
val srcIds = new VertexArrayList
|
||||
|
@ -20,7 +20,7 @@ ED: ClassManifest]{
|
|||
}
|
||||
|
||||
def toEdgePartition: EdgePartition[ED] = {
|
||||
new EdgePartition(srcIds.toLongArray(), dstIds.toLongArray(), dataBuilder.result())
|
||||
new EdgePartition(srcIds.toLongArray(), dstIds.toLongArray(), dataBuilder.result())
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package org.apache.spark.graph.impl
|
|||
import scala.collection.JavaConversions._
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import org.apache.spark.SparkContext._
|
||||
import org.apache.spark.Partitioner
|
||||
|
@ -72,7 +73,8 @@ object EdgeTripletBuilder {
|
|||
new EdgeTripletIterator(vmap, edgePartition)
|
||||
}
|
||||
ClosureCleaner.clean(iterFun)
|
||||
vTableReplicated.join(eTable).mapPartitions( iterFun ) // end of map partition
|
||||
vTableReplicated.zipJoinRDD(eTable)
|
||||
.mapPartitions( iterFun ) // end of map partition
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -83,7 +85,7 @@ object EdgeTripletBuilder {
|
|||
*/
|
||||
class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
||||
@transient val vTable: IndexedRDD[Vid, VD],
|
||||
@transient val vid2pid: IndexedRDD[Vid, Pid],
|
||||
@transient val vid2pid: IndexedRDD[Vid, Array[Pid]],
|
||||
@transient val eTable: IndexedRDD[Pid, EdgePartition[ED]])
|
||||
extends Graph[VD, ED] {
|
||||
|
||||
|
@ -144,7 +146,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
val numVertices = this.numVertices
|
||||
val numEdges = this.numEdges
|
||||
val replicationRatio =
|
||||
vid2pid.groupByKey().map(kv => kv._2.size).sum / vTable.count
|
||||
vid2pid.map(kv => kv._2.size).sum / vTable.count
|
||||
val loadArray =
|
||||
eTable.map{ case (pid, epart) => epart.data.size }.collect.map(x => x.toDouble / numEdges)
|
||||
val minLoad = loadArray.min
|
||||
|
@ -342,7 +344,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
ClosureCleaner.clean(reduceFunc)
|
||||
|
||||
// Map and preaggregate
|
||||
val preAgg = vTableReplicated.join(eTable).flatMap{
|
||||
val preAgg = vTableReplicated.zipJoinRDD(eTable).flatMap{
|
||||
case (pid, (vmap, edgePartition)) =>
|
||||
val aggMap = new VertexHashMap[A]
|
||||
val et = new EdgeTriplet[VD, ED]
|
||||
|
@ -363,19 +365,15 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
}.partitionBy(vTable.index.rdd.partitioner.get)
|
||||
|
||||
// do the final reduction reusing the index map
|
||||
IndexedRDD.reduceByKey(preAgg, reduceFunc, vTable.index)
|
||||
IndexedRDD(preAgg, vTable.index, reduceFunc)
|
||||
}
|
||||
|
||||
|
||||
override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest]
|
||||
(updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2)
|
||||
: Graph[VD2, ED] = {
|
||||
|
||||
ClosureCleaner.clean(updateF)
|
||||
|
||||
val newVTable = vTable.leftOuterJoin(updates).mapValuesWithKeys{
|
||||
case (vid, (data, other)) => updateF(vid, data, other)
|
||||
}.asInstanceOf[IndexedRDD[Vid,VD2]]
|
||||
val newVTable = vTable.zipJoinLeftWithKeys(updates)(updateF)
|
||||
new GraphImpl(newVTable, vid2pid, eTable)
|
||||
}
|
||||
|
||||
|
@ -451,34 +449,38 @@ object GraphImpl {
|
|||
val data = message.data
|
||||
builder.add(data._1, data._2, data._3)
|
||||
}
|
||||
Iterator((pid, builder.toEdgePartition))
|
||||
val edgePartition = builder.toEdgePartition
|
||||
Iterator((pid, edgePartition))
|
||||
}, preservesPartitioning = true).indexed()
|
||||
}
|
||||
|
||||
|
||||
protected def createVid2Pid[ED: ClassManifest](
|
||||
eTable: IndexedRDD[Pid, EdgePartition[ED]],
|
||||
vTableIndex: RDDIndex[Vid]): IndexedRDD[Vid, Pid] = {
|
||||
eTable.mapPartitions { iter =>
|
||||
vTableIndex: RDDIndex[Vid]): IndexedRDD[Vid, Array[Pid]] = {
|
||||
val preAgg = eTable.mapPartitions { iter =>
|
||||
val (pid, edgePartition) = iter.next()
|
||||
val vSet = new VertexSet
|
||||
edgePartition.foreach(e => {vSet.add(e.srcId); vSet.add(e.dstId)})
|
||||
vSet.iterator.map { vid => (vid.toLong, pid) }
|
||||
}.indexed(vTableIndex)
|
||||
}
|
||||
IndexedRDD[Vid, Pid, ArrayBuffer[Pid]](preAgg, vTableIndex,
|
||||
(p: Pid) => ArrayBuffer(p),
|
||||
(ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
|
||||
(a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
|
||||
.mapValues(a => a.toArray).asInstanceOf[IndexedRDD[Vid, Array[Pid]]]
|
||||
}
|
||||
|
||||
|
||||
protected def createVTableReplicated[VD: ClassManifest, ED: ClassManifest](
|
||||
vTable: IndexedRDD[Vid, VD], vid2pid: IndexedRDD[Vid, Pid],
|
||||
vTable: IndexedRDD[Vid, VD], vid2pid: IndexedRDD[Vid, Array[Pid]],
|
||||
eTable: IndexedRDD[Pid, EdgePartition[ED]]):
|
||||
IndexedRDD[Pid, VertexHashMap[VD]] = {
|
||||
// Join vid2pid and vTable, generate a shuffle dependency on the joined
|
||||
// result, and get the shuffle id so we can use it on the slave.
|
||||
vTable.cogroup(vid2pid)
|
||||
.flatMap { case (vid, (vdatas, pids)) =>
|
||||
pids.iterator.map {
|
||||
pid => MessageToPartition(pid, (vid, vdatas.head))
|
||||
}
|
||||
vTable.zipJoinRDD(vid2pid)
|
||||
.flatMap { case (vid, (vdata, pids)) =>
|
||||
pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) }
|
||||
}
|
||||
.partitionBy(eTable.partitioner.get) //@todo assert edge table has partitioner
|
||||
.mapPartitionsWithIndex( (pid, iter) => {
|
||||
|
|
Loading…
Reference in a new issue