From 11a44d0ec98a746d252980cb745962ac2c079336 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Mon, 14 Oct 2013 19:37:52 -0700 Subject: [PATCH 1/4] Introducing indexedrdd The rest of indexed rdd --- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../apache/spark/api/java/JavaPairRDD.scala | 78 ++- .../org/apache/spark/rdd/IndexedRDD.scala | 256 ++++++++++ .../spark/rdd/IndexedRDDFunctions.scala | 295 +++++++++++ .../apache/spark/rdd/PairRDDFunctions.scala | 66 ++- .../main/scala/org/apache/spark/rdd/RDD.scala | 17 + .../apache/spark/rdd/IndexedRDDSuite.scala | 461 ++++++++++++++++++ 7 files changed, 1139 insertions(+), 36 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala create mode 100644 core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala create mode 100644 core/src/test/scala/org/apache/spark/rdd/IndexedRDDSuite.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b7a8179786..f3723a4f9d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -928,7 +928,7 @@ object SparkContext { // TODO: Add AccumulatorParams for other types, e.g. lists and strings implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) = - new PairRDDFunctions(rdd) + rdd.pairRDDFunctions implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest]( rdd: RDD[(K, V)]) = diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index a6518abf45..2f94ae5fa8 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -264,8 +264,11 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. */ - def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = + def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(rdd.join(other, partitioner)) + } /** * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the @@ -275,6 +278,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif */ def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (V, Optional[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.leftOuterJoin(other, partitioner) fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) } @@ -287,6 +292,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif */ def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (Optional[V], W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.rightOuterJoin(other, partitioner) fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) } @@ -325,16 +332,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] = + def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(rdd.join(other)) + } /** * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] = + def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(rdd.join(other, numPartitions)) + } /** * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the @@ -343,6 +356,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * using the existing partitioner/parallelism level. */ def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.leftOuterJoin(other) fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) } @@ -354,6 +369,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * into `numPartitions` partitions. */ def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.leftOuterJoin(other, numPartitions) fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) } @@ -365,6 +382,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * RDD using the existing partitioner/parallelism level. */ def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.rightOuterJoin(other) fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) } @@ -376,6 +395,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * RDD into the given number of partitions. */ def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.rightOuterJoin(other, numPartitions) fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) } @@ -412,55 +433,86 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner) - : JavaPairRDD[K, (JList[V], JList[W])] = + : JavaPairRDD[K, (JList[V], JList[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner))) + } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + implicit val w1m: ClassManifest[W1] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]] + implicit val w2m: ClassManifest[W2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]] fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner))) + } /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = + def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(cogroupResultToJava(rdd.cogroup(other))) + } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + implicit val w1m: ClassManifest[W1] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]] + implicit val w2m: ClassManifest[W2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]] fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2))) + } /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])] - = fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions))) - + def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions))) + } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + implicit val w1m: ClassManifest[W1] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]] + implicit val w2m: ClassManifest[W2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]] fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions))) + } /** Alias for cogroup. */ - def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = + def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(cogroupResultToJava(rdd.groupWith(other))) + } /** Alias for cogroup. */ def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + implicit val w1m: ClassManifest[W1] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]] + implicit val w2m: ClassManifest[W2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]] fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2))) + } /** * Return the list of values in the RDD for key `key`. This operation is done efficiently if the diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala new file mode 100644 index 0000000000..8d2e9782c2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import java.nio.ByteBuffer + + +import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet} + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark._ +import org.apache.spark.rdd._ +import org.apache.spark.SparkContext._ +import org.apache.spark.Partitioner._ + +import org.apache.spark.storage.StorageLevel + + + + + + +/** + * The BlockIndex is the internal map structure used inside the index + * of the IndexedRDD. + */ +class BlockIndex[@specialized K: ClassManifest] extends JHashMap[K,Int] + + +/** + * The RDDIndex is an opaque type used to represent the organization + * of values in an RDD + */ +class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockIndex[K]]) { + def persist(newLevel: StorageLevel): RDDIndex[K] = { + rdd.persist(newLevel) + return this + } + + def partitioner: Partitioner = rdd.partitioner.get +} + + + + + +/** + * An IndexedRDD[K,V] extends the RDD[(K,V)] by pre-indexing the keys and + * organizing the values to enable faster join operations. + * + * In addition to providing the basic RDD[(K,V)] functionality the IndexedRDD + * exposes an index member which can be used to "key" other IndexedRDDs + * + */ +class IndexedRDD[K: ClassManifest, V: ClassManifest]( + @transient val index: RDDIndex[K], + @transient val valuesRDD: RDD[ Seq[Seq[V]] ]) + extends RDD[(K, V)](index.rdd.context, + List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) { + + /** + * An internal representation which joins the block indices with the values + */ + protected[spark] val tuples = new ZippedRDD(index.rdd.context, index.rdd, valuesRDD) + + + /** + * The partitioner is defined by the index + */ + override val partitioner = index.rdd.partitioner + + + + + + /** + * The actual partitions are defined by the tuples. + */ + override def getPartitions: Array[Partition] = tuples.getPartitions + + /** + * The preferred locations are computed based on the preferred locations of the tuples. + */ + override def getPreferredLocations(s: Partition): Seq[String] = + tuples.getPreferredLocations(s) + + + /** + * Caching an IndexedRDD causes the index and values to be cached separately. + */ + override def persist(newLevel: StorageLevel): RDD[(K,V)] = { + index.persist(newLevel) + valuesRDD.persist(newLevel) + return this + } + + + override def pairRDDFunctions[K1, V1]( + implicit t: (K, V) <:< (K1,V1), k: ClassManifest[K1], v: ClassManifest[V1]): + PairRDDFunctions[K1, V1] = { + new IndexedRDDFunctions[K1,V1](this.asInstanceOf[IndexedRDD[K1,V1]]) + } + + + + + + /** + * Provide the RDD[(K,V)] equivalent output. + */ + override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = { + tuples.compute(part, context).flatMap { case (indexMap, values) => + // Walk the index to construct the key, value pairs + indexMap.iterator + // Extract rows with key value pairs and indicators + .map{ case (k, ind) => (k, values(ind)) } + // Remove tuples that aren't actually present in the array + .filter{ case (_, valar) => valar != null && !valar.isEmpty()} + // Extract the pair (removing the indicator from the tuple) + .flatMap{ case (k, valar) => valar.map(v => (k,v))} + } + } + +} // End of IndexedRDD + + + + +object IndexedRDD { + def apply[K: ClassManifest, V: ClassManifest]( + tbl: RDD[(K,V)], + existingIndex: RDDIndex[K] = null ): IndexedRDD[K, V] = { + + if (existingIndex == null) { + // Shuffle the table (if necessary) + val shuffledTbl = + if (tbl.partitioner.isEmpty) { + new ShuffledRDD[K, V, (K,V)](tbl, Partitioner.defaultPartitioner(tbl)) + } else { tbl } + + val groups = shuffledTbl.mapPartitions( iter => { + val indexMap = new BlockIndex[K]() + val values = new ArrayBuffer[Seq[V]]() + for ((k,v) <- iter){ + if(!indexMap.contains(k)) { + val ind = indexMap.size + indexMap.put(k, ind) + values.append(ArrayBuffer.empty[V]) + } + val ind = indexMap.get(k) + values(ind).asInstanceOf[ArrayBuffer[V]].append(v) + } + List((indexMap, values.toSeq)).iterator + }, true).cache + // extract the index and the values + val index = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true) + val values = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) + new IndexedRDD[K,V](new RDDIndex(index), values) + } else { + val index = existingIndex + val partitioner = index.rdd.partitioner match { + case Some(p) => p + case None => throw new SparkException("An index must have a partitioner.") + } + + // Shuffle the table according to the index (if necessary) + val shuffledTbl = + if (tbl.partitioner == Some(partitioner)) { + tbl + } else { + new ShuffledRDD[K, V, (K,V)](tbl, partitioner) + } + + // Use the index to build the new values table + val values = index.rdd.zipPartitions(shuffledTbl)( + (indexIter, tblIter) => { + // There is only one map + val index = indexIter.next() + assert(!indexIter.hasNext()) + val values = new Array[Seq[V]](index.size) + for ((k,v) <- tblIter) { + if (!index.contains(k)) { + throw new SparkException("Error: Trying to bind an external index " + + "to an RDD which contains keys that are not in the index.") + } + val ind = index(k) + if (values(ind) == null) { + values(ind) = ArrayBuffer.empty[V] + } + values(ind).asInstanceOf[ArrayBuffer[V]].append(v) + } + List(values.toSeq).iterator + }) + + new IndexedRDD[K,V](index, values) + } + } + + /** + * Construct and index of the unique values in a given RDD. + */ + def makeIndex[K: ClassManifest](keys: RDD[K], + partitioner: Option[Partitioner] = None): RDDIndex[K] = { + // @todo: I don't need the boolean its only there to be the second type since I want to shuffle a single RDD + // Ugly hack :-(. In order to partition the keys they must have values. + val tbl = keys.mapPartitions(_.map(k => (k, false)), true) + // Shuffle the table (if necessary) + val shuffledTbl = partitioner match { + case None => { + if (tbl.partitioner.isEmpty) { + // @todo: I don't need the boolean its only there to be the second type of the shuffle. + new ShuffledRDD[K, Boolean, (K, Boolean)](tbl, Partitioner.defaultPartitioner(tbl)) + } else { tbl } + } + case Some(partitioner) => + tbl.partitionBy(partitioner) +// new ShuffledRDD[K, Boolean](tbl, partitioner) + } + + + val index = shuffledTbl.mapPartitions( iter => { + val indexMap = new BlockIndex[K]() + for ( (k,_) <- iter ){ + if(!indexMap.contains(k)){ + val ind = indexMap.size + indexMap.put(k, ind) + } + } + List(indexMap).iterator + }, true).cache + new RDDIndex(index) + } + +} + + + + + diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala new file mode 100644 index 0000000000..358ab57b0c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet} + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark._ + + + +class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K,V]) + extends PairRDDFunctions[K,V](self) { + + /** + * Construct a new IndexedRDD that is indexed by only the keys in the RDD + */ + def reindex(): IndexedRDD[K,V] = IndexedRDD(self) + + + /** + * Pass each value in the key-value pair RDD through a map function without changing the keys; + * this also retains the original RDD's partitioning. + */ + override def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = { + val cleanF = self.index.rdd.context.clean(f) + val newValues = self.valuesRDD.mapPartitions(_.map(values => values.map{ + case null => null + case row => row.map(x => f(x)) + }), true) + new IndexedRDD[K,U](self.index, newValues) + } + + + /** + * Pass each value in the key-value pair RDD through a map function without changing the keys; + * this also retains the original RDD's partitioning. + */ + override def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): RDD[(K, U)] = { + val cleanF = self.index.rdd.context.clean(f) + val newValues = self.index.rdd.zipPartitions(self.valuesRDD){ (keysIter, valuesIter) => + val index = keysIter.next() + assert(keysIter.hasNext() == false) + val oldValues = valuesIter.next() + assert(valuesIter.hasNext() == false) + // Allocate the array to store the results into + val newValues: Array[Seq[U]] = new Array[Seq[U]](oldValues.size) + // Populate the new Values + for( (k,i) <- index ) { + if(oldValues(i) != null) { + newValues(i) = oldValues(i).map( v => f(k,v) ) + } + } + Array(newValues.toSeq).iterator + } + new IndexedRDD[K,U](self.index, newValues) + } + + + + /** + * Pass each value in the key-value pair RDD through a flatMap function without changing the + * keys; this also retains the original RDD's partitioning. + */ + override def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K,U)] = { + val cleanF = self.index.rdd.context.clean(f) + val newValues = self.valuesRDD.mapPartitions(_.map(values => values.map{ + case null => null + case row => row.flatMap(x => f(x)) + }), true) + new IndexedRDD[K,U](self.index, newValues) + } + + + /** + * Generic function to combine the elements for each key using a custom set of aggregation + * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C + * Note that V and C can be different -- for example, one might group an RDD of type + * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions: + * + * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) + * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) + * - `mergeCombiners`, to combine two C's into a single one. + */ + override def combineByKey[C: ClassManifest](createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C, + partitioner: Partitioner, + mapSideCombine: Boolean = true, + serializerClass: String = null): RDD[(K, C)] = { + val newValues = self.valuesRDD.mapPartitions( + _.map{ groups: Seq[Seq[V]] => + groups.map{ group: Seq[V] => + if (group != null && !group.isEmpty) { + val c: C = createCombiner(group.head) + val sum: C = group.tail.foldLeft(c)(mergeValue) + Seq(sum) + } else { + null + } + } + }, true) + new IndexedRDD[K,C](self.index, newValues) + } + + + + /** + * Group the values for each key in the RDD into a single sequence. Hash-partitions the + * resulting RDD with the existing partitioner/parallelism level. + */ + override def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { + val newValues = self.valuesRDD.mapPartitions(_.map{ar => ar.map{s => Seq(s)} }, true) + new IndexedRDD[K, Seq[V]](self.index, newValues) + } + + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + override def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): + IndexedRDD[K, (Seq[V], Seq[W])] = { + //RDD[(K, (Seq[V], Seq[W]))] = { + other match { + case other: IndexedRDD[_, _] if self.index == other.index => { + // if both RDDs share exactly the same index and therefore the same super set of keys + // then we simply merge the value RDDs. + // However it is possible that both RDDs are missing a value for a given key in + // which case the returned RDD should have a null value + val newValues = + self.valuesRDD.zipPartitions(other.valuesRDD)( + (thisIter, otherIter) => { + val thisValues: Seq[Seq[V]] = thisIter.next() + assert(!thisIter.hasNext) + val otherValues: Seq[Seq[W]] = otherIter.next() + assert(!otherIter.hasNext) + // Zip the values and if both arrays are null then the key is not present and + // so the resulting value must be null (not a tuple of empty sequences) + val tmp: Seq[Seq[(Seq[V], Seq[W])]] = thisValues.view.zip(otherValues).map{ + case (null, null) => null // The key is not present in either RDD + case (a, null) => Seq((a, Seq.empty[W])) + case (null, b) => Seq((Seq.empty[V], b)) + case (a, b) => Seq((a,b)) + }.toSeq + List(tmp).iterator + }) + new IndexedRDD[K, (Seq[V], Seq[W])](self.index, newValues) + } + case other: IndexedRDD[_, _] + if self.index.rdd.partitioner == other.index.rdd.partitioner => { + // If both RDDs are indexed using different indices but with the same partitioners + // then we we need to first merge the indicies and then use the merged index to + // merge the values. + val newIndex = + self.index.rdd.zipPartitions(other.index.rdd)( + (thisIter, otherIter) => { + val thisIndex = thisIter.next() + assert(!thisIter.hasNext) + val otherIndex = otherIter.next() + assert(!otherIter.hasNext) + val newIndex = new BlockIndex[K]() + // @todo Merge only the keys that correspond to non-null values + // Merge the keys + newIndex.putAll(thisIndex) + newIndex.putAll(otherIndex) + // We need to rekey the index + var ctr = 0 + for (e <- newIndex.entrySet) { + e.setValue(ctr) + ctr += 1 + } + List(newIndex).iterator + }).cache() + // Use the new index along with the this and the other indices to merge the values + val newValues = + newIndex.zipPartitions(self.tuples, other.tuples)( + (newIndexIter, thisTuplesIter, otherTuplesIter) => { + // Get the new index for this partition + val newIndex = newIndexIter.next() + assert(!newIndexIter.hasNext) + // Get the corresponding indicies and values for this and the other IndexedRDD + val (thisIndex, thisValues) = thisTuplesIter.next() + assert(!thisTuplesIter.hasNext) + val (otherIndex, otherValues) = otherTuplesIter.next() + assert(!otherTuplesIter.hasNext) + // Preallocate the new Values array + val newValues = new Array[Seq[(Seq[V],Seq[W])]](newIndex.size) + // Lookup the sequences in both submaps + for ((k,ind) <- newIndex) { + val thisSeq = if (thisIndex.contains(k)) thisValues(thisIndex.get(k)) else null + val otherSeq = if (otherIndex.contains(k)) otherValues(otherIndex.get(k)) else null + // if either of the sequences is not null then the key was in one of the two tables + // and so the value should appear in the returned table + newValues(ind) = (thisSeq, otherSeq) match { + case (null, null) => null + case (a, null) => Seq( (a, Seq.empty[W]) ) + case (null, b) => Seq( (Seq.empty[V], b) ) + case (a, b) => Seq( (a,b) ) + } + } + List(newValues.toSeq).iterator + }) + new IndexedRDD(new RDDIndex(newIndex), newValues) + } + case _ => { + // Get the partitioner from the index + val partitioner = self.index.rdd.partitioner match { + case Some(p) => p + case None => throw new SparkException("An index must have a partitioner.") + } + // Shuffle the other RDD using the partitioner for this index + val otherShuffled = + if (other.partitioner == Some(partitioner)) { + other + } else { + new ShuffledRDD[K, W, (K,W)](other, partitioner) + } + // Join the other RDD with this RDD building a new valueset and new index on the fly + val groups = + self.tuples.zipPartitions(otherShuffled)( + (thisTuplesIter, otherTuplesIter) => { + // Get the corresponding indicies and values for this IndexedRDD + val (thisIndex, thisValues) = thisTuplesIter.next() + assert(!thisTuplesIter.hasNext()) + // Construct a new index + val newIndex = thisIndex.clone().asInstanceOf[BlockIndex[K]] + // Construct a new array Buffer to store the values + val newValues = ArrayBuffer.fill[(Seq[V], Seq[W])](thisValues.size)(null) + // populate the newValues with the values in this IndexedRDD + for ((k,i) <- thisIndex) { + if (thisValues(i) != null) { + newValues(i) = (thisValues(i), ArrayBuffer.empty[W]) + } + } + // Now iterate through the other tuples updating the map + for ((k,w) <- otherTuplesIter){ + if (!newIndex.contains(k)) { + // update the index + val ind = newIndex.size + newIndex.put(k, ind) + // Update the values + newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) + } else { + val ind = newIndex.get(k) + if(newValues(ind) == null) { + // If the other key was in the index but not in the values + // of this indexed RDD then create a new values entry for it + newValues(ind) = (Seq.empty[V], ArrayBuffer(w)) + } else { + newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w) + } + } + } + // Finalize the new values array + val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] = + newValues.view.map{ + case null => null + case (s, ab) => Seq((s, ab.toSeq)) + }.toSeq + List( (newIndex, newValuesArray) ).iterator + }).cache() + + // Extract the index and values from the above RDD + val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true) + val newValues = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) + + new IndexedRDD[K, (Seq[V], Seq[W])](new RDDIndex(newIndex), newValues) + } + } + } + + +} + +//(self: IndexedRDD[K, V]) extends PairRDDFunctions(self) { } + + diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index a47c512275..569d74ae7a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -68,7 +68,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * In addition, users can control the partitioning of the output RDD, and whether to perform * map-side aggregation (if a mapper can produce multiple items with the same key). */ - def combineByKey[C](createCombiner: V => C, + def combineByKey[C: ClassManifest](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, @@ -102,7 +102,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) /** * Simplified version of combineByKey that hash-partitions the output RDD. */ - def combineByKey[C](createCombiner: V => C, + def combineByKey[C: ClassManifest](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] = { @@ -247,7 +247,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */ - def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { + def join[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (v, w) } @@ -259,7 +259,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to * partition the output RDD. */ - def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { + + def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): + RDD[(K, (V, Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => if (ws.isEmpty) { vs.iterator.map(v => (v, None)) @@ -275,7 +277,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to * partition the output RDD. */ - def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) + def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], W))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => if (vs.isEmpty) { @@ -290,7 +292,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * Simplified version of combineByKey that hash-partitions the resulting RDD using the * existing partitioner/parallelism level. */ - def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) + def combineByKey[C: ClassManifest](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)] = { combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) } @@ -318,7 +320,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { + def join[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (V, W))] = { join(other, defaultPartitioner(self, other)) } @@ -327,7 +329,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = { + def join[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = { join(other, new HashPartitioner(numPartitions)) } @@ -337,7 +339,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * using the existing partitioner/parallelism level. */ - def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { leftOuterJoin(other, defaultPartitioner(self, other)) } @@ -347,7 +349,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * into `numPartitions` partitions. */ - def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = { leftOuterJoin(other, new HashPartitioner(numPartitions)) } @@ -357,7 +359,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD using the existing partitioner/parallelism level. */ - def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { + def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { rightOuterJoin(other, defaultPartitioner(self, other)) } @@ -367,7 +369,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD into the given number of partitions. */ - def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = { + def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = { rightOuterJoin(other, new HashPartitioner(numPartitions)) } @@ -386,16 +388,25 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * Pass each value in the key-value pair RDD through a map function without changing the keys; * this also retains the original RDD's partitioning. */ - def mapValues[U](f: V => U): RDD[(K, U)] = { + def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MappedValuesRDD(self, cleanF) } + + /** + * Pass each value in the key-value pair RDD through a map function without changing the keys; + * this also retains the original RDD's partitioning. + */ + def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): RDD[(K, U)] = { + self.map{ case (k,v) => (k, f(k,v)) } + } + /** * Pass each value in the key-value pair RDD through a flatMap function without changing the * keys; this also retains the original RDD's partitioning. */ - def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = { + def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K, U)] = { val cleanF = self.context.clean(f) new FlatMappedValuesRDD(self, cleanF) } @@ -404,7 +415,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } @@ -419,7 +430,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) + def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { throw new SparkException("Default partitioner cannot partition array keys.") @@ -435,7 +446,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner(self, other)) } @@ -443,7 +454,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } @@ -452,7 +463,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = { cogroup(other, new HashPartitioner(numPartitions)) } @@ -460,18 +471,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) + def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { cogroup(other1, other2, new HashPartitioner(numPartitions)) } /** Alias for cogroup. */ - def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + def groupWith[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner(self, other)) } /** Alias for cogroup. */ - def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + def groupWith[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } @@ -692,6 +703,17 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) */ def values: RDD[V] = self.map(_._2) + + def indexed(numPartitions: Int): IndexedRDD[K,V] = + IndexedRDD(self.partitionBy(new HashPartitioner(numPartitions))) + + def indexed(partitioner: Partitioner): IndexedRDD[K,V] = + IndexedRDD(self.partitionBy(partitioner)) + + + def indexed(existingIndex: RDDIndex[K] = null): IndexedRDD[K,V] = + IndexedRDD(self, existingIndex) + private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 1893627ee2..6776220835 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -792,6 +792,23 @@ abstract class RDD[T: ClassManifest]( return buf.toArray } + + + /** + * For RDD[(K,V)] this function returns a pair-functions object for this RDD + */ + def pairRDDFunctions[K, V]( + implicit t: T <:< (K, V), k: ClassManifest[K], v: ClassManifest[V]): + PairRDDFunctions[K, V] = { + new PairRDDFunctions(this.asInstanceOf[RDD[(K,V)]]) + } + + + def makeIndex(partitioner: Option[Partitioner] = None): RDDIndex[T] = + IndexedRDD.makeIndex(this, partitioner) + + + /** * Return the first element in this RDD. */ diff --git a/core/src/test/scala/org/apache/spark/rdd/IndexedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/IndexedRDDSuite.scala new file mode 100644 index 0000000000..3a2ce4e4da --- /dev/null +++ b/core/src/test/scala/org/apache/spark/rdd/IndexedRDDSuite.scala @@ -0,0 +1,461 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + + +import org.scalatest.FunSuite +import org.scalatest.prop.Checkers +import org.scalacheck.Arbitrary._ +import org.scalacheck.Gen +import org.scalacheck.Prop._ + +import com.google.common.io.Files + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashSet + +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.ShuffledRDD +import org.apache.spark.rdd.IndexedRDD + +import org.apache.spark.SparkContext._ +import org.apache.spark._ + + + +class IndexedRDDSuite extends FunSuite with SharedSparkContext { + + def lineage(rdd: RDD[_]): collection.mutable.HashSet[RDD[_]] = { + val set = new collection.mutable.HashSet[RDD[_]] + def visit(rdd: RDD[_]) { + for (dep <- rdd.dependencies) { + set += dep.rdd + visit(dep.rdd) + } + } + visit(rdd) + set + } + + test("groupByKey") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))).indexed() + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("groupByKey with duplicates") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed() + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("groupByKey with negative key hash codes") { + val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1))).indexed() + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesForMinus1 = groups.find(_._1 == -1).get._2 + assert(valuesForMinus1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("groupByKey with many output partitions") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))).indexed(10) + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("reduceByKey") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed() + val sums = pairs.reduceByKey(_+_).collect() + assert(sums.toSet === Set((1, 7), (2, 1))) + } + + test("reduceByKey with collectAsMap") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed() + val sums = pairs.reduceByKey(_+_).collectAsMap() + assert(sums.size === 2) + assert(sums(1) === 7) + assert(sums(2) === 1) + } + + test("reduceByKey with many output partitons") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(10) + val sums = pairs.reduceByKey(_+_).collect() + assert(sums.toSet === Set((1, 7), (2, 1))) + } + + test("reduceByKey with partitioner") { + val p = new Partitioner() { + def numPartitions = 2 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).indexed(p) + val sums = pairs.reduceByKey(_+_) + assert(sums.collect().toSet === Set((1, 4), (0, 1))) + assert(sums.partitioner === Some(p)) + // count the dependencies to make sure there is only 1 ShuffledRDD + val deps = lineage(sums) + + assert(deps.filter(_.isInstanceOf[ShuffledRDD[_,_,_]]).size === 1) // ShuffledRDD, ParallelCollection + } + + + + test("joinIndexVsPair") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + test("joinIndexVsIndex") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed() + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + test("joinSharedIndex") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4,-4), (4, 4) )).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(rdd1.index) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')), + (4, (-4, 'w')), + (4, (4, 'w')) + )) + } + + + test("join all-to-all") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))).indexed(rdd1.index) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (1, 'y')), + (1, (2, 'x')), + (1, (2, 'y')), + (1, (3, 'x')), + (1, (3, 'y')) + )) + } + + test("leftOuterJoinIndex") { + val index = sc.parallelize( 1 to 6 ).makeIndex() + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.leftOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (3, (1, None)) + )) + } + + test("leftOuterJoinIndextoIndex") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed() + val joined = rdd1.leftOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (3, (1, None)) + )) + } + + test("leftOuterJoinIndextoSharedIndex") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4, -4))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(rdd1.index) + val joined = rdd1.leftOuterJoin(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (4, (-4, Some('w'))), + (3, (1, None)) + )) + } + +test("leftOuterJoinIndextoIndexExternal") { + val index = sc.parallelize( 1 to 6 ).makeIndex() + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index) + val joined = rdd1.leftOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (3, (1, None)) + )) + } + + + test("rightOuterJoin") { + val index = sc.parallelize( 1 to 6 ).makeIndex() + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.rightOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (Some(1), 'x')), + (1, (Some(2), 'x')), + (2, (Some(1), 'y')), + (2, (Some(1), 'z')), + (4, (None, 'w')) + )) + } + + test("rightOuterJoinIndex2Index") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed() + val joined = rdd1.rightOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (Some(1), 'x')), + (1, (Some(2), 'x')), + (2, (Some(1), 'y')), + (2, (Some(1), 'z')), + (4, (None, 'w')) + )) + } + + + test("rightOuterJoinIndex2Indexshared") { + val index = sc.parallelize( 1 to 6 ).makeIndex() + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index) + val joined = rdd1.rightOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (Some(1), 'x')), + (1, (Some(2), 'x')), + (2, (Some(1), 'y')), + (2, (Some(1), 'z')), + (4, (None, 'w')) + )) + } + + + test("join with no matches index") { + val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 0) + } + + test("join with no matches shared index") { + val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))).indexed(index) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 0) + } + + + test("join with many output partitions") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(10) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + test("join with many output partitions and two indices") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(10) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(20) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + + test("groupWith") { + val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) + + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index) + val joined = rdd1.groupWith(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))), + (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))), + (3, (ArrayBuffer(1), ArrayBuffer())), + (4, (ArrayBuffer(), ArrayBuffer('w'))) + )) + } + + test("zero-partition RDD") { + val emptyDir = Files.createTempDir() + val file = sc.textFile(emptyDir.getAbsolutePath) + assert(file.partitions.size == 0) + assert(file.collect().toList === Nil) + // Test that a shuffle on the file works, because this used to be a bug + assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) + } + + test("keys and values") { + val rdd = sc.parallelize(Array((1, "a"), (2, "b"))).indexed() + assert(rdd.keys.collect().toList === List(1, 2)) + assert(rdd.values.collect().toList === List("a", "b")) + } + + test("default partitioner uses partition size") { + // specify 2000 partitions + val a = sc.makeRDD(Array(1, 2, 3, 4), 2000) + // do a map, which loses the partitioner + val b = a.map(a => (a, (a * 2).toString)) + // then a group by, and see we didn't revert to 2 partitions + val c = b.groupByKey() + assert(c.partitions.size === 2000) + } + + // test("default partitioner uses largest partitioner indexed to indexed") { + // val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2).indexed() + // val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000).indexed() + // val c = a.join(b) + // assert(c.partitions.size === 2000) + // } + + + + test("subtract") { + val a = sc.parallelize(Array(1, 2, 3), 2) + val b = sc.parallelize(Array(2, 3, 4), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set(1)) + assert(c.partitions.size === a.partitions.size) + } + + test("subtract with narrow dependency") { + // use a deterministic partitioner + val p = new Partitioner() { + def numPartitions = 5 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + // partitionBy so we have a narrow dependency + val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).indexed(p) + // more partitions/no partitioner so a shuffle dependency + val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set((1, "a"), (3, "c"))) + // Ideally we could keep the original partitioner... + assert(c.partitioner === None) + } + + test("subtractByKey") { + + val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2).indexed() + val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4) + val c = a.subtractByKey(b) + assert(c.collect().toSet === Set((1, "a"), (1, "a"))) + assert(c.partitions.size === a.partitions.size) + } + + // test("subtractByKey with narrow dependency") { + // // use a deterministic partitioner + // val p = new Partitioner() { + // def numPartitions = 5 + // def getPartition(key: Any) = key.asInstanceOf[Int] + // } + + // val index = sc.parallelize( 1 to 6 ).makeIndex(Some(p)) + // // partitionBy so we have a narrow dependency + // val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).indexed(index) + // // more partitions/no partitioner so a shuffle dependency + // val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4).indexed(index) + // val c = a.subtractByKey(b) + // assert(c.collect().toSet === Set((1, "a"), (1, "a"))) + // assert(c.partitioner.get === p) + // } + + test("foldByKey") { + val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(index) + val sums = pairs.foldByKey(0)(_+_).collect() + assert(sums.toSet === Set((1, 7), (2, 1))) + } + + test("foldByKey with mutable result type") { + val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) + + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(index) + val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache() + // Fold the values using in-place mutation + val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect() + assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1)))) + // Check that the mutable objects in the original RDD were not changed + assert(bufs.collect().toSet === Set( + (1, ArrayBuffer(1)), + (1, ArrayBuffer(2)), + (1, ArrayBuffer(3)), + (1, ArrayBuffer(1)), + (2, ArrayBuffer(1)))) + } +} From bf059691f07b59d81b061033a01f1173cc7b2458 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Mon, 14 Oct 2013 19:59:11 -0700 Subject: [PATCH 2/4] Adding a few extra comments. --- .../org/apache/spark/rdd/IndexedRDD.scala | 25 +++++++++++-------- .../main/scala/org/apache/spark/rdd/RDD.scala | 4 +++ 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala index 8d2e9782c2..a881ee3a1d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala @@ -59,8 +59,6 @@ class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockI - - /** * An IndexedRDD[K,V] extends the RDD[(K,V)] by pre-indexing the keys and * organizing the values to enable faster join operations. @@ -75,10 +73,12 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( extends RDD[(K, V)](index.rdd.context, List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) { + /** * An internal representation which joins the block indices with the values */ - protected[spark] val tuples = new ZippedRDD(index.rdd.context, index.rdd, valuesRDD) + protected[spark] val tuples = + new ZippedRDD(index.rdd.context, index.rdd, valuesRDD) /** @@ -87,14 +87,12 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( override val partitioner = index.rdd.partitioner - - - /** * The actual partitions are defined by the tuples. */ override def getPartitions: Array[Partition] = tuples.getPartitions + /** * The preferred locations are computed based on the preferred locations of the tuples. */ @@ -112,6 +110,9 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } + /** + * The IndexedRDD has its own optimized version of the pairRDDFunctions. + */ override def pairRDDFunctions[K1, V1]( implicit t: (K, V) <:< (K1,V1), k: ClassManifest[K1], v: ClassManifest[V1]): PairRDDFunctions[K1, V1] = { @@ -119,9 +120,6 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } - - - /** * Provide the RDD[(K,V)] equivalent output. */ @@ -144,11 +142,16 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( object IndexedRDD { + + /** + * Construct an IndexedRDD from a regular RDD[(K,V)] using an existing index + * if one is provided. + */ def apply[K: ClassManifest, V: ClassManifest]( tbl: RDD[(K,V)], existingIndex: RDDIndex[K] = null ): IndexedRDD[K, V] = { - if (existingIndex == null) { + if (existingIndex == null) { // If no index was provided // Shuffle the table (if necessary) val shuffledTbl = if (tbl.partitioner.isEmpty) { @@ -248,7 +251,7 @@ object IndexedRDD { new RDDIndex(index) } -} +} // end of object IndexedRDD diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 6776220835..f8a969cf10 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -804,6 +804,10 @@ abstract class RDD[T: ClassManifest]( } + /** + * Construct an index over the unique elements in this RDD. The + * index can then be used to organize a RDD[(T,V)]. + */ def makeIndex(partitioner: Option[Partitioner] = None): RDDIndex[T] = IndexedRDD.makeIndex(this, partitioner) From 3cb6dffce0ee7be237749b9f9e5db6e023e06054 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Tue, 15 Oct 2013 18:55:06 -0700 Subject: [PATCH 3/4] adding indexed reduce by key --- .../org/apache/spark/rdd/IndexedRDD.scala | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala index a881ee3a1d..750a456715 100644 --- a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala @@ -216,6 +216,49 @@ object IndexedRDD { } } + def reduceByKey[K: ClassManifest, V: ClassManifest]( + rdd: RDD[(K,V)], reduceFun: (V, V) => V, index: RDDIndex[K]): IndexedRDD[K,V] = { + // Get the index Partitioner + val partitioner = index.rdd.partitioner match { + case Some(p) => p + case None => throw new SparkException("An index must have a partitioner.") + } + // Preaggregate and shuffle if necessary + val partitioned = + if (rdd.partitioner != Some(partitioner)) { + // Preaggregation. + val aggregator = new Aggregator[K, V, V](v => v, reduceFun, reduceFun) + val combined = rdd.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) + combined.partitionBy(partitioner) //new ShuffledRDD[K, V, (K, V)](combined, partitioner) + } else { + rdd + } + + // Use the index to build the new values table + val values = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => { + // There is only one map + val index = indexIter.next() + assert(!indexIter.hasNext()) + val values = new Array[Array[V]](index.size) + for ((k,v) <- tblIter) { + if (!index.contains(k)) { + throw new SparkException("Error: Trying to bind an external index " + + "to an RDD which contains keys that are not in the index.") + } + val ind = index(k) + if (values(ind) == null) { + values(ind) = Array(v) + } else { + values(ind)(0) = reduceFun(values(ind).head, v) + } + } + List(values.view.map(x => if (x != null) x.toSeq else null ).toSeq).iterator + }) + + new IndexedRDD[K,V](index, values) + + } + /** * Construct and index of the unique values in a given RDD. */ From 80e4ec3278c0e63edf0d2c293cf4c650d85eb976 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Wed, 16 Oct 2013 00:16:44 -0700 Subject: [PATCH 4/4] IndexedRDD now only supports unique keys --- .../org/apache/spark/rdd/IndexedRDD.scala | 164 +++++++-------- .../spark/rdd/IndexedRDDFunctions.scala | 199 +++++++++--------- .../apache/spark/rdd/PairRDDFunctions.scala | 7 +- 3 files changed, 184 insertions(+), 186 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala index 750a456715..956e0fe5f9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala @@ -24,6 +24,7 @@ import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet} import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.BitSet import org.apache.spark._ import org.apache.spark.rdd._ @@ -69,7 +70,7 @@ class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockI */ class IndexedRDD[K: ClassManifest, V: ClassManifest]( @transient val index: RDDIndex[K], - @transient val valuesRDD: RDD[ Seq[Seq[V]] ]) + @transient val valuesRDD: RDD[ (Array[V], BitSet) ]) extends RDD[(K, V)](index.rdd.context, List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) { @@ -120,19 +121,40 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } + override def filter(f: Tuple2[K,V] => Boolean): RDD[(K,V)] = { + val cleanF = index.rdd.context.clean(f) + val newValues = index.rdd.zipPartitions(valuesRDD){ + (keysIter, valuesIter) => + val index = keysIter.next() + assert(keysIter.hasNext() == false) + val (oldValues, bs) = valuesIter.next() + assert(valuesIter.hasNext() == false) + // Allocate the array to store the results into + val newValues: Array[V] = oldValues.clone().asInstanceOf[Array[V]] + val newBS = new BitSet(oldValues.size) + // Populate the new Values + for( (k,i) <- index ) { + if ( bs(i) && f( (k, oldValues(i)) ) ) { newBS(i) = true } + else { newValues(i) = null.asInstanceOf[V] } + } + Array((newValues, newBS)).iterator + } + new IndexedRDD[K,V](index, newValues) + } + /** * Provide the RDD[(K,V)] equivalent output. */ override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = { - tuples.compute(part, context).flatMap { case (indexMap, values) => + tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) => // Walk the index to construct the key, value pairs - indexMap.iterator + indexMap.iterator // Extract rows with key value pairs and indicators - .map{ case (k, ind) => (k, values(ind)) } + .map{ case (k, ind) => (bs(ind), k, ind) } // Remove tuples that aren't actually present in the array - .filter{ case (_, valar) => valar != null && !valar.isEmpty()} + .filter( _._1 ) // Extract the pair (removing the indicator from the tuple) - .flatMap{ case (k, valar) => valar.map(v => (k,v))} + .map( x => (x._2, values(x._3) ) ) } } @@ -143,81 +165,49 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( object IndexedRDD { - /** - * Construct an IndexedRDD from a regular RDD[(K,V)] using an existing index - * if one is provided. - */ +def apply[K: ClassManifest, V: ClassManifest](rdd: RDD[(K,V)]): IndexedRDD[K,V] = + apply(rdd, (a:V, b:V) => a ) + def apply[K: ClassManifest, V: ClassManifest]( - tbl: RDD[(K,V)], - existingIndex: RDDIndex[K] = null ): IndexedRDD[K, V] = { + rdd: RDD[(K,V)], reduceFunc: (V, V) => V): IndexedRDD[K,V] = { + // Preaggregate and shuffle if necessary + // Preaggregation. + val aggregator = new Aggregator[K, V, V](v => v, reduceFunc, reduceFunc) + val partitioner = new HashPartitioner(rdd.partitions.size) + val preAgg = rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner) - if (existingIndex == null) { // If no index was provided - // Shuffle the table (if necessary) - val shuffledTbl = - if (tbl.partitioner.isEmpty) { - new ShuffledRDD[K, V, (K,V)](tbl, Partitioner.defaultPartitioner(tbl)) - } else { tbl } - - val groups = shuffledTbl.mapPartitions( iter => { - val indexMap = new BlockIndex[K]() - val values = new ArrayBuffer[Seq[V]]() - for ((k,v) <- iter){ - if(!indexMap.contains(k)) { - val ind = indexMap.size - indexMap.put(k, ind) - values.append(ArrayBuffer.empty[V]) - } - val ind = indexMap.get(k) - values(ind).asInstanceOf[ArrayBuffer[V]].append(v) - } - List((indexMap, values.toSeq)).iterator - }, true).cache - // extract the index and the values - val index = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true) - val values = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) - new IndexedRDD[K,V](new RDDIndex(index), values) - } else { - val index = existingIndex - val partitioner = index.rdd.partitioner match { - case Some(p) => p - case None => throw new SparkException("An index must have a partitioner.") - } - - // Shuffle the table according to the index (if necessary) - val shuffledTbl = - if (tbl.partitioner == Some(partitioner)) { - tbl + val groups = preAgg.mapPartitions( iter => { + val indexMap = new BlockIndex[K]() + val values = new ArrayBuffer[V]() + val bs = new BitSet + for ((k,v) <- iter) { + if(!indexMap.contains(k)) { + val ind = indexMap.size + indexMap.put(k, ind) + values.append(v) + bs(ind) = true } else { - new ShuffledRDD[K, V, (K,V)](tbl, partitioner) + val ind = indexMap.get(k) + values(ind) = reduceFunc(values(ind), v) } - - // Use the index to build the new values table - val values = index.rdd.zipPartitions(shuffledTbl)( - (indexIter, tblIter) => { - // There is only one map - val index = indexIter.next() - assert(!indexIter.hasNext()) - val values = new Array[Seq[V]](index.size) - for ((k,v) <- tblIter) { - if (!index.contains(k)) { - throw new SparkException("Error: Trying to bind an external index " + - "to an RDD which contains keys that are not in the index.") - } - val ind = index(k) - if (values(ind) == null) { - values(ind) = ArrayBuffer.empty[V] - } - values(ind).asInstanceOf[ArrayBuffer[V]].append(v) - } - List(values.toSeq).iterator - }) - - new IndexedRDD[K,V](index, values) - } + } + List( (indexMap, (values.toArray, bs)) ).iterator + }, true).cache + // extract the index and the values + val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true) + val values = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) + new IndexedRDD[K,V](new RDDIndex(index), values) } - def reduceByKey[K: ClassManifest, V: ClassManifest]( - rdd: RDD[(K,V)], reduceFun: (V, V) => V, index: RDDIndex[K]): IndexedRDD[K,V] = { + + + def apply[K: ClassManifest, V: ClassManifest]( + rdd: RDD[(K,V)], index: RDDIndex[K]): IndexedRDD[K,V] = + apply(rdd, index, (a:V,b:V) => a) + + def apply[K: ClassManifest, V: ClassManifest]( + rdd: RDD[(K,V)], index: RDDIndex[K], + reduceFunc: (V, V) => V): IndexedRDD[K,V] = { // Get the index Partitioner val partitioner = index.rdd.partitioner match { case Some(p) => p @@ -227,9 +217,8 @@ object IndexedRDD { val partitioned = if (rdd.partitioner != Some(partitioner)) { // Preaggregation. - val aggregator = new Aggregator[K, V, V](v => v, reduceFun, reduceFun) - val combined = rdd.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) - combined.partitionBy(partitioner) //new ShuffledRDD[K, V, (K, V)](combined, partitioner) + val aggregator = new Aggregator[K, V, V](v => v, reduceFunc, reduceFunc) + rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner) } else { rdd } @@ -239,26 +228,27 @@ object IndexedRDD { // There is only one map val index = indexIter.next() assert(!indexIter.hasNext()) - val values = new Array[Array[V]](index.size) + val values = new Array[V](index.size) + val bs = new BitSet(index.size) for ((k,v) <- tblIter) { if (!index.contains(k)) { throw new SparkException("Error: Trying to bind an external index " + "to an RDD which contains keys that are not in the index.") } val ind = index(k) - if (values(ind) == null) { - values(ind) = Array(v) + if (bs(ind)) { + values(ind) = reduceFunc(values(ind), v) } else { - values(ind)(0) = reduceFun(values(ind).head, v) + values(ind) = v + bs(ind) = true } } - List(values.view.map(x => if (x != null) x.toSeq else null ).toSeq).iterator + List((values, bs)).iterator }) - new IndexedRDD[K,V](index, values) + } // end of apply - } - + /** * Construct and index of the unique values in a given RDD. */ @@ -277,9 +267,7 @@ object IndexedRDD { } case Some(partitioner) => tbl.partitionBy(partitioner) -// new ShuffledRDD[K, Boolean](tbl, partitioner) } - val index = shuffledTbl.mapPartitions( iter => { val indexMap = new BlockIndex[K]() diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala index 358ab57b0c..59e7d7da2b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala @@ -21,6 +21,8 @@ import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet} import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.BitSet + import org.apache.spark._ @@ -41,11 +43,15 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K */ override def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = { val cleanF = self.index.rdd.context.clean(f) - val newValues = self.valuesRDD.mapPartitions(_.map(values => values.map{ - case null => null - case row => row.map(x => f(x)) - }), true) - new IndexedRDD[K,U](self.index, newValues) + val newValuesRDD = self.valuesRDD.mapPartitions(iter => iter.map{ + case (values, bs) => + val newValues = new Array[U](values.size) + for ( ind <- bs ) { + newValues(ind) = f(values(ind)) + } + (newValues, bs) + }, preservesPartitioning = true) + new IndexedRDD[K,U](self.index, newValuesRDD) } @@ -55,38 +61,37 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K */ override def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): RDD[(K, U)] = { val cleanF = self.index.rdd.context.clean(f) - val newValues = self.index.rdd.zipPartitions(self.valuesRDD){ (keysIter, valuesIter) => + val newValues = self.index.rdd.zipPartitions(self.valuesRDD){ + (keysIter, valuesIter) => val index = keysIter.next() assert(keysIter.hasNext() == false) - val oldValues = valuesIter.next() + val (oldValues, bs) = valuesIter.next() assert(valuesIter.hasNext() == false) - // Allocate the array to store the results into - val newValues: Array[Seq[U]] = new Array[Seq[U]](oldValues.size) + // Allocate the array to store the results into + val newValues: Array[U] = new Array[U](oldValues.size) // Populate the new Values for( (k,i) <- index ) { - if(oldValues(i) != null) { - newValues(i) = oldValues(i).map( v => f(k,v) ) - } + if (bs(i)) { newValues(i) = f(k, oldValues(i)) } } - Array(newValues.toSeq).iterator + Array((newValues, bs)).iterator } new IndexedRDD[K,U](self.index, newValues) } - /** - * Pass each value in the key-value pair RDD through a flatMap function without changing the - * keys; this also retains the original RDD's partitioning. - */ - override def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K,U)] = { - val cleanF = self.index.rdd.context.clean(f) - val newValues = self.valuesRDD.mapPartitions(_.map(values => values.map{ - case null => null - case row => row.flatMap(x => f(x)) - }), true) - new IndexedRDD[K,U](self.index, newValues) - } + // /** + // * Pass each value in the key-value pair RDD through a flatMap function without changing the + // * keys; this also retains the original RDD's partitioning. + // */ + // override def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K,U)] = { + // val cleanF = self.index.rdd.context.clean(f) + // val newValues = self.valuesRDD.mapPartitions(_.map(values => values.map{ + // case null => null + // case row => row.flatMap(x => f(x)) + // }), true) + // new IndexedRDD[K,U](self.index, newValues) + // } /** @@ -105,31 +110,19 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)] = { - val newValues = self.valuesRDD.mapPartitions( - _.map{ groups: Seq[Seq[V]] => - groups.map{ group: Seq[V] => - if (group != null && !group.isEmpty) { - val c: C = createCombiner(group.head) - val sum: C = group.tail.foldLeft(c)(mergeValue) - Seq(sum) - } else { - null - } - } - }, true) - new IndexedRDD[K,C](self.index, newValues) + mapValues(createCombiner) } - /** - * Group the values for each key in the RDD into a single sequence. Hash-partitions the - * resulting RDD with the existing partitioner/parallelism level. - */ - override def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { - val newValues = self.valuesRDD.mapPartitions(_.map{ar => ar.map{s => Seq(s)} }, true) - new IndexedRDD[K, Seq[V]](self.index, newValues) - } + // /** + // * Group the values for each key in the RDD into a single sequence. Hash-partitions the + // * resulting RDD with the existing partitioner/parallelism level. + // */ + // override def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { + // val newValues = self.valuesRDD.mapPartitions(_.map{ar => ar.map{s => Seq(s)} }, true) + // new IndexedRDD[K, Seq[V]](self.index, newValues) + // } /** @@ -146,23 +139,24 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K // However it is possible that both RDDs are missing a value for a given key in // which case the returned RDD should have a null value val newValues = - self.valuesRDD.zipPartitions(other.valuesRDD)( - (thisIter, otherIter) => { - val thisValues: Seq[Seq[V]] = thisIter.next() + self.valuesRDD.zipPartitions(other.valuesRDD){ + (thisIter, otherIter) => + val (thisValues, thisBS) = thisIter.next() assert(!thisIter.hasNext) - val otherValues: Seq[Seq[W]] = otherIter.next() - assert(!otherIter.hasNext) - // Zip the values and if both arrays are null then the key is not present and - // so the resulting value must be null (not a tuple of empty sequences) - val tmp: Seq[Seq[(Seq[V], Seq[W])]] = thisValues.view.zip(otherValues).map{ - case (null, null) => null // The key is not present in either RDD - case (a, null) => Seq((a, Seq.empty[W])) - case (null, b) => Seq((Seq.empty[V], b)) - case (a, b) => Seq((a,b)) - }.toSeq - List(tmp).iterator - }) - new IndexedRDD[K, (Seq[V], Seq[W])](self.index, newValues) + val (otherValues, otherBS) = otherIter.next() + assert(!otherIter.hasNext) + + val newValues = new Array[(Seq[V], Seq[W])](thisValues.size) + val newBS = thisBS | otherBS + + for( ind <- newBS ) { + val a = if (thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V] + val b = if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W] + newValues(ind) = (a, b) + } + List((newValues, newBS)).iterator + } + new IndexedRDD(self.index, newValues) } case other: IndexedRDD[_, _] if self.index.rdd.partitioner == other.index.rdd.partitioner => { @@ -197,26 +191,33 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K val newIndex = newIndexIter.next() assert(!newIndexIter.hasNext) // Get the corresponding indicies and values for this and the other IndexedRDD - val (thisIndex, thisValues) = thisTuplesIter.next() + val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next() assert(!thisTuplesIter.hasNext) - val (otherIndex, otherValues) = otherTuplesIter.next() + val (otherIndex, (otherValues, otherBS)) = otherTuplesIter.next() assert(!otherTuplesIter.hasNext) // Preallocate the new Values array - val newValues = new Array[Seq[(Seq[V],Seq[W])]](newIndex.size) + val newValues = new Array[(Seq[V], Seq[W])](newIndex.size) + val newBS = new BitSet(newIndex.size) + // Lookup the sequences in both submaps for ((k,ind) <- newIndex) { - val thisSeq = if (thisIndex.contains(k)) thisValues(thisIndex.get(k)) else null - val otherSeq = if (otherIndex.contains(k)) otherValues(otherIndex.get(k)) else null - // if either of the sequences is not null then the key was in one of the two tables - // and so the value should appear in the returned table - newValues(ind) = (thisSeq, otherSeq) match { - case (null, null) => null - case (a, null) => Seq( (a, Seq.empty[W]) ) - case (null, b) => Seq( (Seq.empty[V], b) ) - case (a, b) => Seq( (a,b) ) + // Get the left key + val a = if (thisIndex.contains(k)) { + val ind = thisIndex.get(k) + if(thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V] + } else Seq.empty[V] + // Get the right key + val b = if (otherIndex.contains(k)) { + val ind = otherIndex.get(k) + if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W] + } else Seq.empty[W] + // If at least one key was present then we generate a tuple. + if (!a.isEmpty || !b.isEmpty) { + newValues(ind) = (a, b) + newBS(ind) = true } } - List(newValues.toSeq).iterator + List((newValues, newBS)).iterator }) new IndexedRDD(new RDDIndex(newIndex), newValues) } @@ -238,44 +239,48 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K self.tuples.zipPartitions(otherShuffled)( (thisTuplesIter, otherTuplesIter) => { // Get the corresponding indicies and values for this IndexedRDD - val (thisIndex, thisValues) = thisTuplesIter.next() + val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next() assert(!thisTuplesIter.hasNext()) // Construct a new index val newIndex = thisIndex.clone().asInstanceOf[BlockIndex[K]] // Construct a new array Buffer to store the values - val newValues = ArrayBuffer.fill[(Seq[V], Seq[W])](thisValues.size)(null) + val newValues = ArrayBuffer.fill[ (Seq[V], Seq[W]) ](thisValues.size)(null) + val newBS = new BitSet(thisValues.size) // populate the newValues with the values in this IndexedRDD for ((k,i) <- thisIndex) { - if (thisValues(i) != null) { - newValues(i) = (thisValues(i), ArrayBuffer.empty[W]) + if (thisBS(i)) { + newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W]) + newBS(i) = true } } // Now iterate through the other tuples updating the map for ((k,w) <- otherTuplesIter){ - if (!newIndex.contains(k)) { + if (newIndex.contains(k)) { + val ind = newIndex.get(k) + if(newBS(ind)) { + newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w) + } else { + // If the other key was in the index but not in the values + // of this indexed RDD then create a new values entry for it + newBS(ind) = true + newValues(ind) = (Seq.empty[V], ArrayBuffer(w)) + } + } else { // update the index val ind = newIndex.size newIndex.put(k, ind) + newBS(ind) = true // Update the values - newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) - } else { - val ind = newIndex.get(k) - if(newValues(ind) == null) { - // If the other key was in the index but not in the values - // of this indexed RDD then create a new values entry for it - newValues(ind) = (Seq.empty[V], ArrayBuffer(w)) - } else { - newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w) - } + newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) } } - // Finalize the new values array - val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] = - newValues.view.map{ - case null => null - case (s, ab) => Seq((s, ab.toSeq)) - }.toSeq - List( (newIndex, newValuesArray) ).iterator + // // Finalize the new values array + // val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] = + // newValues.view.map{ + // case null => null + // case (s, ab) => Seq((s, ab.toSeq)) + // }.toSeq + List( (newIndex, (newValues.toArray, newBS)) ).iterator }).cache() // Extract the index and values from the above RDD diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 569d74ae7a..eeb60d3ff9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -704,6 +704,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) def values: RDD[V] = self.map(_._2) + def indexed(): IndexedRDD[K,V] = IndexedRDD(self) + + def indexed(numPartitions: Int): IndexedRDD[K,V] = IndexedRDD(self.partitionBy(new HashPartitioner(numPartitions))) @@ -711,9 +714,11 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) IndexedRDD(self.partitionBy(partitioner)) - def indexed(existingIndex: RDDIndex[K] = null): IndexedRDD[K,V] = + def indexed(existingIndex: RDDIndex[K]): IndexedRDD[K,V] = IndexedRDD(self, existingIndex) + + private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure