From 06e5e6627f3856b5c6e3e60cbb167044de9ef6d4 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 17 Feb 2013 22:13:26 -0800 Subject: [PATCH] Renamed "splits" to "partitions" --- bagel/src/main/scala/spark/bagel/Bagel.scala | 20 ++-- .../bagel/examples/WikipediaPageRank.scala | 6 +- .../WikipediaPageRankStandalone.scala | 2 +- core/src/main/scala/spark/CacheManager.scala | 4 +- .../main/scala/spark/DoubleRDDFunctions.scala | 4 +- .../main/scala/spark/PairRDDFunctions.scala | 50 ++++---- .../spark/{Split.scala => Partition.scala} | 2 +- core/src/main/scala/spark/RDD.scala | 76 ++++++------ .../main/scala/spark/RDDCheckpointData.scala | 12 +- core/src/main/scala/spark/SparkContext.scala | 10 +- .../scala/spark/api/java/JavaDoubleRDD.scala | 6 +- .../scala/spark/api/java/JavaPairRDD.scala | 44 +++---- .../main/scala/spark/api/java/JavaRDD.scala | 6 +- .../scala/spark/api/java/JavaRDDLike.scala | 12 +- .../scala/spark/api/python/PythonRDD.scala | 10 +- .../partial/ApproximateActionListener.scala | 2 +- core/src/main/scala/spark/rdd/BlockRDD.scala | 16 +-- .../main/scala/spark/rdd/CartesianRDD.scala | 36 +++--- .../main/scala/spark/rdd/CheckpointRDD.scala | 20 ++-- .../main/scala/spark/rdd/CoGroupedRDD.scala | 22 ++-- .../main/scala/spark/rdd/CoalescedRDD.scala | 24 ++-- .../main/scala/spark/rdd/FilteredRDD.scala | 6 +- .../main/scala/spark/rdd/FlatMappedRDD.scala | 6 +- .../src/main/scala/spark/rdd/GlommedRDD.scala | 6 +- core/src/main/scala/spark/rdd/HadoopRDD.scala | 20 ++-- .../scala/spark/rdd/MapPartitionsRDD.scala | 8 +- ....scala => MapPartitionsWithIndexRDD.scala} | 12 +- core/src/main/scala/spark/rdd/MappedRDD.scala | 6 +- .../main/scala/spark/rdd/NewHadoopRDD.scala | 20 ++-- .../spark/rdd/ParallelCollectionRDD.scala | 18 +-- .../scala/spark/rdd/PartitionPruningRDD.scala | 16 +-- core/src/main/scala/spark/rdd/PipedRDD.scala | 6 +- .../src/main/scala/spark/rdd/SampledRDD.scala | 16 +-- .../main/scala/spark/rdd/ShuffledRDD.scala | 10 +- core/src/main/scala/spark/rdd/UnionRDD.scala | 30 ++--- core/src/main/scala/spark/rdd/ZippedRDD.scala | 32 ++--- .../scala/spark/scheduler/DAGScheduler.scala | 14 +-- .../scala/spark/scheduler/ResultTask.scala | 6 +- .../spark/scheduler/ShuffleMapTask.scala | 6 +- .../main/scala/spark/scheduler/Stage.scala | 2 +- .../scala/spark/storage/BlockManager.scala | 2 +- .../scala/spark/storage/StorageUtils.scala | 2 +- .../test/scala/spark/CheckpointSuite.scala | 110 +++++++++--------- core/src/test/scala/spark/RDDSuite.scala | 13 ++- core/src/test/scala/spark/ShuffleSuite.scala | 2 +- core/src/test/scala/spark/SortingSuite.scala | 10 +- .../spark/scheduler/DAGSchedulerSuite.scala | 22 ++-- .../spark/scheduler/TaskContextSuite.scala | 10 +- 48 files changed, 405 insertions(+), 390 deletions(-) rename core/src/main/scala/spark/{Split.scala => Partition.scala} (84%) rename core/src/main/scala/spark/rdd/{MapPartitionsWithSplitRDD.scala => MapPartitionsWithIndexRDD.scala} (57%) diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala index fa0ba4a573..094e57dacb 100644 --- a/bagel/src/main/scala/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/spark/bagel/Bagel.scala @@ -14,11 +14,11 @@ object Bagel extends Logging { combiner: Combiner[M, C], aggregator: Option[Aggregator[V, A]], partitioner: Partitioner, - numSplits: Int + numPartitions: Int )( compute: (V, Option[C], Option[A], Int) => (V, Array[M]) ): RDD[(K, V)] = { - val splits = if (numSplits != 0) numSplits else sc.defaultParallelism + val splits = if (numPartitions != 0) numPartitions else sc.defaultParallelism var superstep = 0 var verts = vertices @@ -56,12 +56,12 @@ object Bagel extends Logging { messages: RDD[(K, M)], combiner: Combiner[M, C], partitioner: Partitioner, - numSplits: Int + numPartitions: Int )( compute: (V, Option[C], Int) => (V, Array[M]) ): RDD[(K, V)] = { run[K, V, M, C, Nothing]( - sc, vertices, messages, combiner, None, partitioner, numSplits)( + sc, vertices, messages, combiner, None, partitioner, numPartitions)( addAggregatorArg[K, V, M, C](compute)) } @@ -70,13 +70,13 @@ object Bagel extends Logging { vertices: RDD[(K, V)], messages: RDD[(K, M)], combiner: Combiner[M, C], - numSplits: Int + numPartitions: Int )( compute: (V, Option[C], Int) => (V, Array[M]) ): RDD[(K, V)] = { - val part = new HashPartitioner(numSplits) + val part = new HashPartitioner(numPartitions) run[K, V, M, C, Nothing]( - sc, vertices, messages, combiner, None, part, numSplits)( + sc, vertices, messages, combiner, None, part, numPartitions)( addAggregatorArg[K, V, M, C](compute)) } @@ -84,13 +84,13 @@ object Bagel extends Logging { sc: SparkContext, vertices: RDD[(K, V)], messages: RDD[(K, M)], - numSplits: Int + numPartitions: Int )( compute: (V, Option[Array[M]], Int) => (V, Array[M]) ): RDD[(K, V)] = { - val part = new HashPartitioner(numSplits) + val part = new HashPartitioner(numPartitions) run[K, V, M, Array[M], Nothing]( - sc, vertices, messages, new DefaultCombiner(), None, part, numSplits)( + sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions)( addAggregatorArg[K, V, M, Array[M]](compute)) } diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala index 03843019c0..bc32663e0f 100644 --- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala +++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala @@ -16,7 +16,7 @@ import scala.xml.{XML,NodeSeq} object WikipediaPageRank { def main(args: Array[String]) { if (args.length < 5) { - System.err.println("Usage: WikipediaPageRank ") + System.err.println("Usage: WikipediaPageRank ") System.exit(-1) } @@ -25,7 +25,7 @@ object WikipediaPageRank { val inputFile = args(0) val threshold = args(1).toDouble - val numSplits = args(2).toInt + val numPartitions = args(2).toInt val host = args(3) val usePartitioner = args(4).toBoolean val sc = new SparkContext(host, "WikipediaPageRank") @@ -69,7 +69,7 @@ object WikipediaPageRank { val result = Bagel.run( sc, vertices, messages, combiner = new PRCombiner(), - numSplits = numSplits)( + numPartitions = numPartitions)( utils.computeWithCombiner(numVertices, epsilon)) // Print the result diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala index 06cc8c748b..9d9d80d809 100644 --- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala +++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala @@ -88,7 +88,7 @@ object WikipediaPageRankStandalone { n: Long, partitioner: Partitioner, usePartitioner: Boolean, - numSplits: Int + numPartitions: Int ): RDD[(String, Double)] = { var ranks = links.mapValues { edges => defaultRank } for (i <- 1 to numIterations) { diff --git a/core/src/main/scala/spark/CacheManager.scala b/core/src/main/scala/spark/CacheManager.scala index 711435c333..c7b379a3fb 100644 --- a/core/src/main/scala/spark/CacheManager.scala +++ b/core/src/main/scala/spark/CacheManager.scala @@ -11,13 +11,13 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { private val loading = new HashSet[String] /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */ - def getOrCompute[T](rdd: RDD[T], split: Split, context: TaskContext, storageLevel: StorageLevel) + def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel) : Iterator[T] = { val key = "rdd_%d_%d".format(rdd.id, split.index) logInfo("Cache key is " + key) blockManager.get(key) match { case Some(cachedValues) => - // Split is in cache, so just return its values + // Partition is in cache, so just return its values logInfo("Found partition in cache!") return cachedValues.asInstanceOf[Iterator[T]] diff --git a/core/src/main/scala/spark/DoubleRDDFunctions.scala b/core/src/main/scala/spark/DoubleRDDFunctions.scala index b2a0e2b631..178d31a73b 100644 --- a/core/src/main/scala/spark/DoubleRDDFunctions.scala +++ b/core/src/main/scala/spark/DoubleRDDFunctions.scala @@ -42,14 +42,14 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { /** (Experimental) Approximate operation to return the mean within a timeout. */ def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = { val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns) - val evaluator = new MeanEvaluator(self.splits.size, confidence) + val evaluator = new MeanEvaluator(self.partitions.size, confidence) self.context.runApproximateJob(self, processPartition, evaluator, timeout) } /** (Experimental) Approximate operation to return the sum within a timeout. */ def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = { val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns) - val evaluator = new SumEvaluator(self.splits.size, confidence) + val evaluator = new SumEvaluator(self.partitions.size, confidence) self.context.runApproximateJob(self, processPartition, evaluator, timeout) } } diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 019be11ea8..4319cbd892 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -83,8 +83,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, - numSplits: Int): RDD[(K, C)] = { - combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits)) + numPartitions: Int): RDD[(K, C)] = { + combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions)) } /** @@ -145,10 +145,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( /** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a - * "combiner" in MapReduce. Output will be hash-partitioned with numSplits splits. + * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. */ - def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = { - reduceByKey(new HashPartitioner(numSplits), func) + def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = { + reduceByKey(new HashPartitioner(numPartitions), func) } /** @@ -166,10 +166,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( /** * Group the values for each key in the RDD into a single sequence. Hash-partitions the - * resulting RDD with into `numSplits` partitions. + * resulting RDD with into `numPartitions` partitions. */ - def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = { - groupByKey(new HashPartitioner(numSplits)) + def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = { + groupByKey(new HashPartitioner(numPartitions)) } /** @@ -287,8 +287,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = { - join(other, new HashPartitioner(numSplits)) + def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = { + join(other, new HashPartitioner(numPartitions)) } /** @@ -305,10 +305,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output - * into `numSplits` partitions. + * into `numPartitions` partitions. */ - def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = { - leftOuterJoin(other, new HashPartitioner(numSplits)) + def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = { + leftOuterJoin(other, new HashPartitioner(numPartitions)) } /** @@ -327,8 +327,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD into the given number of partitions. */ - def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = { - rightOuterJoin(other, new HashPartitioner(numSplits)) + def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = { + rightOuterJoin(other, new HashPartitioner(numPartitions)) } /** @@ -414,17 +414,17 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Seq[V], Seq[W]))] = { - cogroup(other, new HashPartitioner(numSplits)) + def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = { + cogroup(other, new HashPartitioner(numPartitions)) } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numSplits: Int) + def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { - cogroup(other1, other2, new HashPartitioner(numSplits)) + cogroup(other1, other2, new HashPartitioner(numPartitions)) } /** Alias for cogroup. */ @@ -636,9 +636,9 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in * order of the keys). */ - def sortByKey(ascending: Boolean = true, numSplits: Int = self.splits.size): RDD[(K,V)] = { + def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[(K,V)] = { val shuffled = - new ShuffledRDD[K, V](self, new RangePartitioner(numSplits, self, ascending)) + new ShuffledRDD[K, V](self, new RangePartitioner(numPartitions, self, ascending)) shuffled.mapPartitions(iter => { val buf = iter.toArray if (ascending) { @@ -652,9 +652,9 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( private[spark] class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev) { - override def getSplits = firstParent[(K, V)].splits + override def getPartitions = firstParent[(K, V)].partitions override val partitioner = firstParent[(K, V)].partitioner - override def compute(split: Split, context: TaskContext) = + override def compute(split: Partition, context: TaskContext) = firstParent[(K, V)].iterator(split, context).map{ case (k, v) => (k, f(v)) } } @@ -662,9 +662,9 @@ private[spark] class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U]) extends RDD[(K, U)](prev) { - override def getSplits = firstParent[(K, V)].splits + override def getPartitions = firstParent[(K, V)].partitions override val partitioner = firstParent[(K, V)].partitioner - override def compute(split: Split, context: TaskContext) = { + override def compute(split: Partition, context: TaskContext) = { firstParent[(K, V)].iterator(split, context).flatMap { case (k, v) => f(v).map(x => (k, x)) } } } diff --git a/core/src/main/scala/spark/Split.scala b/core/src/main/scala/spark/Partition.scala similarity index 84% rename from core/src/main/scala/spark/Split.scala rename to core/src/main/scala/spark/Partition.scala index 90d4b47c55..e384308ef6 100644 --- a/core/src/main/scala/spark/Split.scala +++ b/core/src/main/scala/spark/Partition.scala @@ -3,7 +3,7 @@ package spark /** * A partition of an RDD. */ -trait Split extends Serializable { +trait Partition extends Serializable { /** * Get the split's index within its parent RDD */ diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index f6e927a989..da82dfd10f 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -27,7 +27,7 @@ import spark.rdd.FlatMappedRDD import spark.rdd.GlommedRDD import spark.rdd.MappedRDD import spark.rdd.MapPartitionsRDD -import spark.rdd.MapPartitionsWithSplitRDD +import spark.rdd.MapPartitionsWithIndexRDD import spark.rdd.PipedRDD import spark.rdd.SampledRDD import spark.rdd.UnionRDD @@ -49,7 +49,7 @@ import SparkContext._ * * Internally, each RDD is characterized by five main properties: * - * - A list of splits (partitions) + * - A list of partitions * - A function for computing each split * - A list of dependencies on other RDDs * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) @@ -76,13 +76,13 @@ abstract class RDD[T: ClassManifest]( // ======================================================================= /** Implemented by subclasses to compute a given partition. */ - def compute(split: Split, context: TaskContext): Iterator[T] + def compute(split: Partition, context: TaskContext): Iterator[T] /** * Implemented by subclasses to return the set of partitions in this RDD. This method will only * be called once, so it is safe to implement a time-consuming computation in it. */ - protected def getSplits: Array[Split] + protected def getPartitions: Array[Partition] /** * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only @@ -91,7 +91,7 @@ abstract class RDD[T: ClassManifest]( protected def getDependencies: Seq[Dependency[_]] = deps /** Optionally overridden by subclasses to specify placement preferences. */ - protected def getPreferredLocations(split: Split): Seq[String] = Nil + protected def getPreferredLocations(split: Partition): Seq[String] = Nil /** Optionally overridden by subclasses to specify how they are partitioned. */ val partitioner: Option[Partitioner] = None @@ -137,10 +137,10 @@ abstract class RDD[T: ClassManifest]( /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ def getStorageLevel = storageLevel - // Our dependencies and splits will be gotten by calling subclass's methods below, and will + // Our dependencies and partitions will be gotten by calling subclass's methods below, and will // be overwritten when we're checkpointed private var dependencies_ : Seq[Dependency[_]] = null - @transient private var splits_ : Array[Split] = null + @transient private var partitions_ : Array[Partition] = null /** An Option holding our checkpoint RDD, if we are checkpointed */ private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD) @@ -159,15 +159,15 @@ abstract class RDD[T: ClassManifest]( } /** - * Get the array of splits of this RDD, taking into account whether the + * Get the array of partitions of this RDD, taking into account whether the * RDD is checkpointed or not. */ - final def splits: Array[Split] = { - checkpointRDD.map(_.splits).getOrElse { - if (splits_ == null) { - splits_ = getSplits + final def partitions: Array[Partition] = { + checkpointRDD.map(_.partitions).getOrElse { + if (partitions_ == null) { + partitions_ = getPartitions } - splits_ + partitions_ } } @@ -175,7 +175,7 @@ abstract class RDD[T: ClassManifest]( * Get the preferred location of a split, taking into account whether the * RDD is checkpointed or not. */ - final def preferredLocations(split: Split): Seq[String] = { + final def preferredLocations(split: Partition): Seq[String] = { checkpointRDD.map(_.getPreferredLocations(split)).getOrElse { getPreferredLocations(split) } @@ -186,7 +186,7 @@ abstract class RDD[T: ClassManifest]( * This should ''not'' be called by users directly, but is available for implementors of custom * subclasses of RDD. */ - final def iterator(split: Split, context: TaskContext): Iterator[T] = { + final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { @@ -197,7 +197,7 @@ abstract class RDD[T: ClassManifest]( /** * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing. */ - private[spark] def computeOrReadCheckpoint(split: Split, context: TaskContext): Iterator[T] = { + private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { if (isCheckpointed) { firstParent[T].iterator(split, context) } else { @@ -227,15 +227,15 @@ abstract class RDD[T: ClassManifest]( /** * Return a new RDD containing the distinct elements in this RDD. */ - def distinct(numSplits: Int): RDD[T] = - map(x => (x, null)).reduceByKey((x, y) => x, numSplits).map(_._1) + def distinct(numPartitions: Int): RDD[T] = + map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) - def distinct(): RDD[T] = distinct(splits.size) + def distinct(): RDD[T] = distinct(partitions.size) /** - * Return a new RDD that is reduced into `numSplits` partitions. + * Return a new RDD that is reduced into `numPartitions` partitions. */ - def coalesce(numSplits: Int): RDD[T] = new CoalescedRDD(this, numSplits) + def coalesce(numPartitions: Int): RDD[T] = new CoalescedRDD(this, numPartitions) /** * Return a sampled subset of this RDD. @@ -303,9 +303,9 @@ abstract class RDD[T: ClassManifest]( * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = { + def groupBy[K: ClassManifest](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] = { val cleanF = sc.clean(f) - this.map(t => (cleanF(t), t)).groupByKey(numSplits) + this.map(t => (cleanF(t), t)).groupByKey(numPartitions) } /** @@ -336,14 +336,24 @@ abstract class RDD[T: ClassManifest]( preservesPartitioning: Boolean = false): RDD[U] = new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning) - /** + /** * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */ + def mapPartitionsWithIndex[U: ClassManifest]( + f: (Int, Iterator[T]) => Iterator[U], + preservesPartitioning: Boolean = false): RDD[U] = + new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning) + + /** + * Return a new RDD by applying a function to each partition of this RDD, while tracking the index + * of the original partition. + */ + @deprecated("use mapPartitionsWithIndex") def mapPartitionsWithSplit[U: ClassManifest]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = - new MapPartitionsWithSplitRDD(this, sc.clean(f), preservesPartitioning) + new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning) /** * Zips this RDD with another one, returning key-value pairs with the first element in each RDD, @@ -471,7 +481,7 @@ abstract class RDD[T: ClassManifest]( } result } - val evaluator = new CountEvaluator(splits.size, confidence) + val evaluator = new CountEvaluator(partitions.size, confidence) sc.runApproximateJob(this, countElements, evaluator, timeout) } @@ -522,7 +532,7 @@ abstract class RDD[T: ClassManifest]( } map } - val evaluator = new GroupedCountEvaluator[T](splits.size, confidence) + val evaluator = new GroupedCountEvaluator[T](partitions.size, confidence) sc.runApproximateJob(this, countPartition, evaluator, timeout) } @@ -537,7 +547,7 @@ abstract class RDD[T: ClassManifest]( } val buf = new ArrayBuffer[T] var p = 0 - while (buf.size < num && p < splits.size) { + while (buf.size < num && p < partitions.size) { val left = num - buf.size val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true) buf ++= res(0) @@ -657,11 +667,11 @@ abstract class RDD[T: ClassManifest]( /** * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`) - * created from the checkpoint file, and forget its old dependencies and splits. + * created from the checkpoint file, and forget its old dependencies and partitions. */ private[spark] def markCheckpointed(checkpointRDD: RDD[_]) { clearDependencies() - splits_ = null + partitions_ = null deps = null // Forget the constructor argument for dependencies too } @@ -676,15 +686,15 @@ abstract class RDD[T: ClassManifest]( } /** A description of this RDD and its recursive dependencies for debugging. */ - def toDebugString(): String = { + def toDebugString: String = { def debugString(rdd: RDD[_], prefix: String = ""): Seq[String] = { - Seq(prefix + rdd + " (" + rdd.splits.size + " splits)") ++ + Seq(prefix + rdd + " (" + rdd.partitions.size + " partitions)") ++ rdd.dependencies.flatMap(d => debugString(d.rdd, prefix + " ")) } debugString(this).mkString("\n") } - override def toString(): String = "%s%s[%d] at %s".format( + override def toString: String = "%s%s[%d] at %s".format( Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala index a4a4ebaf53..d00092e984 100644 --- a/core/src/main/scala/spark/RDDCheckpointData.scala +++ b/core/src/main/scala/spark/RDDCheckpointData.scala @@ -16,7 +16,7 @@ private[spark] object CheckpointState extends Enumeration { /** * This class contains all the information related to RDD checkpointing. Each instance of this class * is associated with a RDD. It manages process of checkpointing of the associated RDD, as well as, - * manages the post-checkpoint state by providing the updated splits, iterator and preferred locations + * manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations * of the checkpointed RDD. */ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T]) @@ -67,11 +67,11 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T]) rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path) _) val newRDD = new CheckpointRDD[T](rdd.context, path) - // Change the dependencies and splits of the RDD + // Change the dependencies and partitions of the RDD RDDCheckpointData.synchronized { cpFile = Some(path) cpRDD = Some(newRDD) - rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and splits + rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions cpState = Checkpointed RDDCheckpointData.clearTaskCaches() logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id) @@ -79,15 +79,15 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T]) } // Get preferred location of a split after checkpointing - def getPreferredLocations(split: Split): Seq[String] = { + def getPreferredLocations(split: Partition): Seq[String] = { RDDCheckpointData.synchronized { cpRDD.get.preferredLocations(split) } } - def getSplits: Array[Split] = { + def getPartitions: Array[Partition] = { RDDCheckpointData.synchronized { - cpRDD.get.splits + cpRDD.get.partitions } } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 047b57dc1f..f299b7ea46 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -614,14 +614,14 @@ class SparkContext( * Run a job on all partitions in an RDD and return the results in an array. */ def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = { - runJob(rdd, func, 0 until rdd.splits.size, false) + runJob(rdd, func, 0 until rdd.partitions.size, false) } /** * Run a job on all partitions in an RDD and return the results in an array. */ def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { - runJob(rdd, func, 0 until rdd.splits.size, false) + runJob(rdd, func, 0 until rdd.partitions.size, false) } /** @@ -632,7 +632,7 @@ class SparkContext( processPartition: (TaskContext, Iterator[T]) => U, resultHandler: (Int, U) => Unit) { - runJob[T, U](rdd, processPartition, 0 until rdd.splits.size, false, resultHandler) + runJob[T, U](rdd, processPartition, 0 until rdd.partitions.size, false, resultHandler) } /** @@ -644,7 +644,7 @@ class SparkContext( resultHandler: (Int, U) => Unit) { val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter) - runJob[T, U](rdd, processFunc, 0 until rdd.splits.size, false, resultHandler) + runJob[T, U](rdd, processFunc, 0 until rdd.partitions.size, false, resultHandler) } /** @@ -696,7 +696,7 @@ class SparkContext( /** Default level of parallelism to use when not given by user (e.g. for reduce tasks) */ def defaultParallelism: Int = taskScheduler.defaultParallelism - /** Default min number of splits for Hadoop RDDs when not given by user */ + /** Default min number of partitions for Hadoop RDDs when not given by user */ def defaultMinSplits: Int = math.min(defaultParallelism, 2) private var nextShuffleId = new AtomicInteger(0) diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala index 2810631b41..da3cb2cd31 100644 --- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala @@ -44,7 +44,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav /** * Return a new RDD containing the distinct elements in this RDD. */ - def distinct(numSplits: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numSplits)) + def distinct(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numPartitions)) /** * Return a new RDD containing only the elements that satisfy a predicate. @@ -53,9 +53,9 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav fromRDD(srdd.filter(x => f(x).booleanValue())) /** - * Return a new RDD that is reduced into `numSplits` partitions. + * Return a new RDD that is reduced into `numPartitions` partitions. */ - def coalesce(numSplits: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numSplits)) + def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions)) /** * Return a sampled subset of this RDD. diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index 55dc755358..df3af3817d 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -54,7 +54,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif /** * Return a new RDD containing the distinct elements in this RDD. */ - def distinct(numSplits: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numSplits)) + def distinct(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numPartitions)) /** * Return a new RDD containing only the elements that satisfy a predicate. @@ -63,9 +63,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue())) /** - * Return a new RDD that is reduced into `numSplits` partitions. + * Return a new RDD that is reduced into `numPartitions` partitions. */ - def coalesce(numSplits: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.coalesce(numSplits)) + def coalesce(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.coalesce(numPartitions)) /** * Return a sampled subset of this RDD. @@ -122,8 +122,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], - numSplits: Int): JavaPairRDD[K, C] = - combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits)) + numPartitions: Int): JavaPairRDD[K, C] = + combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions)) /** * Merge the values for each key using an associative reduce function. This will also perform @@ -162,10 +162,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif /** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a - * "combiner" in MapReduce. Output will be hash-partitioned with numSplits splits. + * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. */ - def reduceByKey(func: JFunction2[V, V, V], numSplits: Int): JavaPairRDD[K, V] = - fromRDD(rdd.reduceByKey(func, numSplits)) + def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V] = + fromRDD(rdd.reduceByKey(func, numPartitions)) /** * Group the values for each key in the RDD into a single sequence. Allows controlling the @@ -176,10 +176,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif /** * Group the values for each key in the RDD into a single sequence. Hash-partitions the - * resulting RDD with into `numSplits` partitions. + * resulting RDD with into `numPartitions` partitions. */ - def groupByKey(numSplits: Int): JavaPairRDD[K, JList[V]] = - fromRDD(groupByResultToJava(rdd.groupByKey(numSplits))) + def groupByKey(numPartitions: Int): JavaPairRDD[K, JList[V]] = + fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions))) /** * Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine` @@ -261,8 +261,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (V, W)] = - fromRDD(rdd.join(other, numSplits)) + def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] = + fromRDD(rdd.join(other, numPartitions)) /** * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the @@ -277,10 +277,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output - * into `numSplits` partitions. + * into `numPartitions` partitions. */ - def leftOuterJoin[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (V, Option[W])] = - fromRDD(rdd.leftOuterJoin(other, numSplits)) + def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Option[W])] = + fromRDD(rdd.leftOuterJoin(other, numPartitions)) /** * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the @@ -297,8 +297,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD into the given number of partitions. */ - def rightOuterJoin[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (Option[V], W)] = - fromRDD(rdd.rightOuterJoin(other, numSplits)) + def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Option[V], W)] = + fromRDD(rdd.rightOuterJoin(other, numPartitions)) /** * Return the key-value pairs in this RDD to the master as a Map. @@ -362,16 +362,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (JList[V], JList[W])] - = fromRDD(cogroupResultToJava(rdd.cogroup(other, numSplits))) + def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])] + = fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions))) /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numSplits: Int) + def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int) : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = - fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numSplits))) + fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions))) /** Alias for cogroup. */ def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala index 23e7ae2726..3ccd6f055e 100644 --- a/core/src/main/scala/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaRDD.scala @@ -30,7 +30,7 @@ JavaRDDLike[T, JavaRDD[T]] { /** * Return a new RDD containing the distinct elements in this RDD. */ - def distinct(numSplits: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numSplits)) + def distinct(numPartitions: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numPartitions)) /** * Return a new RDD containing only the elements that satisfy a predicate. @@ -39,9 +39,9 @@ JavaRDDLike[T, JavaRDD[T]] { wrapRDD(rdd.filter((x => f(x).booleanValue()))) /** - * Return a new RDD that is reduced into `numSplits` partitions. + * Return a new RDD that is reduced into `numPartitions` partitions. */ - def coalesce(numSplits: Int): JavaRDD[T] = rdd.coalesce(numSplits) + def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions) /** * Return a sampled subset of this RDD. diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index d34d56d169..90b45cf875 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -4,7 +4,7 @@ import java.util.{List => JList} import scala.Tuple2 import scala.collection.JavaConversions._ -import spark.{SparkContext, Split, RDD, TaskContext} +import spark.{SparkContext, Partition, RDD, TaskContext} import spark.api.java.JavaPairRDD._ import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} import spark.partial.{PartialResult, BoundedDouble} @@ -20,7 +20,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround def rdd: RDD[T] /** Set of partitions in this RDD. */ - def splits: JList[Split] = new java.util.ArrayList(rdd.splits.toSeq) + def splits: JList[Partition] = new java.util.ArrayList(rdd.partitions.toSeq) /** The [[spark.SparkContext]] that this RDD was created on. */ def context: SparkContext = rdd.context @@ -36,7 +36,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround * This should ''not'' be called by users directly, but is available for implementors of custom * subclasses of RDD. */ - def iterator(split: Split, taskContext: TaskContext): java.util.Iterator[T] = + def iterator(split: Partition, taskContext: TaskContext): java.util.Iterator[T] = asJavaIterator(rdd.iterator(split, taskContext)) // Transformations (return a new RDD) @@ -146,12 +146,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K](f: JFunction[T, K], numSplits: Int): JavaPairRDD[K, JList[T]] = { + def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = { implicit val kcm: ClassManifest[K] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] implicit val vcm: ClassManifest[JList[T]] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[JList[T]]] - JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numSplits)(f.returnType)))(kcm, vcm) + JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))(kcm, vcm) } /** @@ -333,6 +333,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround /** A description of this RDD and its recursive dependencies for debugging. */ def toDebugString(): String = { - rdd.toDebugString() + rdd.toDebugString } } diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index ab8351e55e..8c73477384 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -32,11 +32,11 @@ private[spark] class PythonRDD[T: ClassManifest]( this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec, broadcastVars, accumulator) - override def getSplits = parent.splits + override def getPartitions = parent.partitions override val partitioner = if (preservePartitoning) parent.partitioner else None - override def compute(split: Split, context: TaskContext): Iterator[Array[Byte]] = { + override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME") val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/python/pyspark/worker.py")) @@ -65,7 +65,7 @@ private[spark] class PythonRDD[T: ClassManifest]( SparkEnv.set(env) val out = new PrintWriter(proc.getOutputStream) val dOut = new DataOutputStream(proc.getOutputStream) - // Split index + // Partition index dOut.writeInt(split.index) // sparkFilesDir PythonRDD.writeAsPickle(SparkFiles.getRootDirectory, dOut) @@ -155,8 +155,8 @@ private class PythonException(msg: String) extends Exception(msg) */ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends RDD[(Array[Byte], Array[Byte])](prev) { - override def getSplits = prev.splits - override def compute(split: Split, context: TaskContext) = + override def getPartitions = prev.partitions + override def compute(split: Partition, context: TaskContext) = prev.iterator(split, context).grouped(2).map { case Seq(a, b) => (a, b) case x => throw new Exception("PairwiseRDD: unexpected value: " + x) diff --git a/core/src/main/scala/spark/partial/ApproximateActionListener.scala b/core/src/main/scala/spark/partial/ApproximateActionListener.scala index 24b4909380..de2dce161a 100644 --- a/core/src/main/scala/spark/partial/ApproximateActionListener.scala +++ b/core/src/main/scala/spark/partial/ApproximateActionListener.scala @@ -20,7 +20,7 @@ private[spark] class ApproximateActionListener[T, U, R]( extends JobListener { val startTime = System.currentTimeMillis() - val totalTasks = rdd.splits.size + val totalTasks = rdd.partitions.size var finishedTasks = 0 var failure: Option[Exception] = None // Set if the job has failed (permanently) var resultObject: Option[PartialResult[R]] = None // Set if we've already returned a PartialResult diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala index 17989c5ce5..7348c4f15b 100644 --- a/core/src/main/scala/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/spark/rdd/BlockRDD.scala @@ -1,9 +1,9 @@ package spark.rdd import scala.collection.mutable.HashMap -import spark.{RDD, SparkContext, SparkEnv, Split, TaskContext} +import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext} -private[spark] class BlockRDDSplit(val blockId: String, idx: Int) extends Split { +private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Partition { val index = idx } @@ -18,14 +18,14 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St HashMap(blockIds.zip(locations):_*) } - override def getSplits: Array[Split] = (0 until blockIds.size).map(i => { - new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split] + override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i => { + new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition] }).toArray - override def compute(split: Split, context: TaskContext): Iterator[T] = { + override def compute(split: Partition, context: TaskContext): Iterator[T] = { val blockManager = SparkEnv.get.blockManager - val blockId = split.asInstanceOf[BlockRDDSplit].blockId + val blockId = split.asInstanceOf[BlockRDDPartition].blockId blockManager.get(blockId) match { case Some(block) => block.asInstanceOf[Iterator[T]] case None => @@ -33,8 +33,8 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St } } - override def getPreferredLocations(split: Split): Seq[String] = - locations_(split.asInstanceOf[BlockRDDSplit].blockId) + override def getPreferredLocations(split: Partition): Seq[String] = + locations_(split.asInstanceOf[BlockRDDPartition].blockId) } diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala index 41cbbd0093..38600b8be4 100644 --- a/core/src/main/scala/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala @@ -5,22 +5,22 @@ import spark._ private[spark] -class CartesianSplit( +class CartesianPartition( idx: Int, @transient rdd1: RDD[_], @transient rdd2: RDD[_], s1Index: Int, s2Index: Int - ) extends Split { - var s1 = rdd1.splits(s1Index) - var s2 = rdd2.splits(s2Index) + ) extends Partition { + var s1 = rdd1.partitions(s1Index) + var s2 = rdd2.partitions(s2Index) override val index: Int = idx @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream) { // Update the reference to parent split at the time of task serialization - s1 = rdd1.splits(s1Index) - s2 = rdd2.splits(s2Index) + s1 = rdd1.partitions(s1Index) + s2 = rdd2.partitions(s2Index) oos.defaultWriteObject() } } @@ -33,35 +33,35 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest]( extends RDD[Pair[T, U]](sc, Nil) with Serializable { - val numSplitsInRdd2 = rdd2.splits.size + val numPartitionsInRdd2 = rdd2.partitions.size - override def getSplits: Array[Split] = { + override def getPartitions: Array[Partition] = { // create the cross product split - val array = new Array[Split](rdd1.splits.size * rdd2.splits.size) - for (s1 <- rdd1.splits; s2 <- rdd2.splits) { - val idx = s1.index * numSplitsInRdd2 + s2.index - array(idx) = new CartesianSplit(idx, rdd1, rdd2, s1.index, s2.index) + val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size) + for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) { + val idx = s1.index * numPartitionsInRdd2 + s2.index + array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index) } array } - override def getPreferredLocations(split: Split): Seq[String] = { - val currSplit = split.asInstanceOf[CartesianSplit] + override def getPreferredLocations(split: Partition): Seq[String] = { + val currSplit = split.asInstanceOf[CartesianPartition] rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2) } - override def compute(split: Split, context: TaskContext) = { - val currSplit = split.asInstanceOf[CartesianSplit] + override def compute(split: Partition, context: TaskContext) = { + val currSplit = split.asInstanceOf[CartesianPartition] for (x <- rdd1.iterator(currSplit.s1, context); y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) } override def getDependencies: Seq[Dependency[_]] = List( new NarrowDependency(rdd1) { - def getParents(id: Int): Seq[Int] = List(id / numSplitsInRdd2) + def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2) }, new NarrowDependency(rdd2) { - def getParents(id: Int): Seq[Int] = List(id % numSplitsInRdd2) + def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2) } ) diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala index 3558d4673f..36bfb0355e 100644 --- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala @@ -9,7 +9,7 @@ import org.apache.hadoop.fs.Path import java.io.{File, IOException, EOFException} import java.text.NumberFormat -private[spark] class CheckpointRDDSplit(val index: Int) extends Split {} +private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {} /** * This RDD represents a RDD checkpoint file (similar to HadoopRDD). @@ -20,27 +20,27 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration) - override def getSplits: Array[Split] = { + override def getPartitions: Array[Partition] = { val dirContents = fs.listStatus(new Path(checkpointPath)) val splitFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted - val numSplits = splitFiles.size + val numPartitions = splitFiles.size if (!splitFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) || - !splitFiles(numSplits-1).endsWith(CheckpointRDD.splitIdToFile(numSplits-1))) { + !splitFiles(numPartitions-1).endsWith(CheckpointRDD.splitIdToFile(numPartitions-1))) { throw new SparkException("Invalid checkpoint directory: " + checkpointPath) } - Array.tabulate(numSplits)(i => new CheckpointRDDSplit(i)) + Array.tabulate(numPartitions)(i => new CheckpointRDDPartition(i)) } checkpointData = Some(new RDDCheckpointData[T](this)) checkpointData.get.cpFile = Some(checkpointPath) - override def getPreferredLocations(split: Split): Seq[String] = { + override def getPreferredLocations(split: Partition): Seq[String] = { val status = fs.getFileStatus(new Path(checkpointPath)) val locations = fs.getFileBlockLocations(status, 0, status.getLen) locations.headOption.toList.flatMap(_.getHosts).filter(_ != "localhost") } - override def compute(split: Split, context: TaskContext): Iterator[T] = { + override def compute(split: Partition, context: TaskContext): Iterator[T] = { val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index)) CheckpointRDD.readFromFile(file, context) } @@ -107,7 +107,7 @@ private[spark] object CheckpointRDD extends Logging { deserializeStream.asIterator.asInstanceOf[Iterator[T]] } - // Test whether CheckpointRDD generate expected number of splits despite + // Test whether CheckpointRDD generate expected number of partitions despite // each split file having multiple blocks. This needs to be run on a // cluster (mesos or standalone) using HDFS. def main(args: Array[String]) { @@ -120,8 +120,8 @@ private[spark] object CheckpointRDD extends Logging { val fs = path.getFileSystem(new Configuration()) sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _) val cpRDD = new CheckpointRDD[Int](sc, path.toString) - assert(cpRDD.splits.length == rdd.splits.length, "Number of splits is not the same") - assert(cpRDD.collect.toList == rdd.collect.toList, "Data of splits not the same") + assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") + assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same") fs.delete(path) } } diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index 868ee5a39f..5200fb6b65 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -5,7 +5,7 @@ import java.util.{HashMap => JHashMap} import scala.collection.JavaConversions import scala.collection.mutable.ArrayBuffer -import spark.{Aggregator, Logging, Partitioner, RDD, SparkEnv, Split, TaskContext} +import spark.{Aggregator, Logging, Partitioner, RDD, SparkEnv, Partition, TaskContext} import spark.{Dependency, OneToOneDependency, ShuffleDependency} @@ -14,13 +14,13 @@ private[spark] sealed trait CoGroupSplitDep extends Serializable private[spark] case class NarrowCoGroupSplitDep( rdd: RDD[_], splitIndex: Int, - var split: Split + var split: Partition ) extends CoGroupSplitDep { @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream) { // Update the reference to parent split at the time of task serialization - split = rdd.splits(splitIndex) + split = rdd.partitions(splitIndex) oos.defaultWriteObject() } } @@ -28,7 +28,7 @@ private[spark] case class NarrowCoGroupSplitDep( private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep private[spark] -class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with Serializable { +class CoGroupPartition(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Partition with Serializable { override val index: Int = idx override def hashCode(): Int = idx } @@ -58,17 +58,17 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner) } } - override def getSplits: Array[Split] = { - val array = new Array[Split](part.numPartitions) + override def getPartitions: Array[Partition] = { + val array = new Array[Partition](part.numPartitions) for (i <- 0 until array.size) { - // Each CoGroupSplit will have a dependency per contributing RDD - array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (rdd, j) => + // Each CoGroupPartition will have a dependency per contributing RDD + array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) => // Assume each RDD contributed a single dependency, and get it dependencies(j) match { case s: ShuffleDependency[_, _] => new ShuffleCoGroupSplitDep(s.shuffleId) case _ => - new NarrowCoGroupSplitDep(rdd, i, rdd.splits(i)) + new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)) } }.toList) } @@ -77,8 +77,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner) override val partitioner = Some(part) - override def compute(s: Split, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = { - val split = s.asInstanceOf[CoGroupSplit] + override def compute(s: Partition, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = { + val split = s.asInstanceOf[CoGroupPartition] val numRdds = split.deps.size // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs) val map = new JHashMap[K, Seq[ArrayBuffer[Any]]] diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index fcd26da43a..0d16cf6e85 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -1,19 +1,19 @@ package spark.rdd -import spark.{Dependency, OneToOneDependency, NarrowDependency, RDD, Split, TaskContext} +import spark.{Dependency, OneToOneDependency, NarrowDependency, RDD, Partition, TaskContext} import java.io.{ObjectOutputStream, IOException} -private[spark] case class CoalescedRDDSplit( +private[spark] case class CoalescedRDDPartition( index: Int, @transient rdd: RDD[_], parentsIndices: Array[Int] - ) extends Split { - var parents: Seq[Split] = parentsIndices.map(rdd.splits(_)) + ) extends Partition { + var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_)) @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream) { // Update the reference to parent split at the time of task serialization - parents = parentsIndices.map(rdd.splits(_)) + parents = parentsIndices.map(rdd.partitions(_)) oos.defaultWriteObject() } } @@ -31,21 +31,21 @@ class CoalescedRDD[T: ClassManifest]( maxPartitions: Int) extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies - override def getSplits: Array[Split] = { - val prevSplits = prev.splits + override def getPartitions: Array[Partition] = { + val prevSplits = prev.partitions if (prevSplits.length < maxPartitions) { - prevSplits.map(_.index).map{idx => new CoalescedRDDSplit(idx, prev, Array(idx)) } + prevSplits.map(_.index).map{idx => new CoalescedRDDPartition(idx, prev, Array(idx)) } } else { (0 until maxPartitions).map { i => val rangeStart = (i * prevSplits.length) / maxPartitions val rangeEnd = ((i + 1) * prevSplits.length) / maxPartitions - new CoalescedRDDSplit(i, prev, (rangeStart until rangeEnd).toArray) + new CoalescedRDDPartition(i, prev, (rangeStart until rangeEnd).toArray) }.toArray } } - override def compute(split: Split, context: TaskContext): Iterator[T] = { - split.asInstanceOf[CoalescedRDDSplit].parents.iterator.flatMap { parentSplit => + override def compute(split: Partition, context: TaskContext): Iterator[T] = { + split.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentSplit => firstParent[T].iterator(parentSplit, context) } } @@ -53,7 +53,7 @@ class CoalescedRDD[T: ClassManifest]( override def getDependencies: Seq[Dependency[_]] = { Seq(new NarrowDependency(prev) { def getParents(id: Int): Seq[Int] = - splits(id).asInstanceOf[CoalescedRDDSplit].parentsIndices + partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices }) } diff --git a/core/src/main/scala/spark/rdd/FilteredRDD.scala b/core/src/main/scala/spark/rdd/FilteredRDD.scala index 93e398ea2b..c84ec39d21 100644 --- a/core/src/main/scala/spark/rdd/FilteredRDD.scala +++ b/core/src/main/scala/spark/rdd/FilteredRDD.scala @@ -1,16 +1,16 @@ package spark.rdd -import spark.{OneToOneDependency, RDD, Split, TaskContext} +import spark.{OneToOneDependency, RDD, Partition, TaskContext} private[spark] class FilteredRDD[T: ClassManifest]( prev: RDD[T], f: T => Boolean) extends RDD[T](prev) { - override def getSplits: Array[Split] = firstParent[T].splits + override def getPartitions: Array[Partition] = firstParent[T].partitions override val partitioner = prev.partitioner // Since filter cannot change a partition's keys - override def compute(split: Split, context: TaskContext) = + override def compute(split: Partition, context: TaskContext) = firstParent[T].iterator(split, context).filter(f) } diff --git a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala index 8c2a610593..8ebc778925 100644 --- a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala +++ b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala @@ -1,6 +1,6 @@ package spark.rdd -import spark.{RDD, Split, TaskContext} +import spark.{RDD, Partition, TaskContext} private[spark] @@ -9,8 +9,8 @@ class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( f: T => TraversableOnce[U]) extends RDD[U](prev) { - override def getSplits: Array[Split] = firstParent[T].splits + override def getPartitions: Array[Partition] = firstParent[T].partitions - override def compute(split: Split, context: TaskContext) = + override def compute(split: Partition, context: TaskContext) = firstParent[T].iterator(split, context).flatMap(f) } diff --git a/core/src/main/scala/spark/rdd/GlommedRDD.scala b/core/src/main/scala/spark/rdd/GlommedRDD.scala index 70b9b4e34e..e16c7ba881 100644 --- a/core/src/main/scala/spark/rdd/GlommedRDD.scala +++ b/core/src/main/scala/spark/rdd/GlommedRDD.scala @@ -1,12 +1,12 @@ package spark.rdd -import spark.{RDD, Split, TaskContext} +import spark.{RDD, Partition, TaskContext} private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev) { - override def getSplits: Array[Split] = firstParent[T].splits + override def getPartitions: Array[Partition] = firstParent[T].partitions - override def compute(split: Split, context: TaskContext) = + override def compute(split: Partition, context: TaskContext) = Array(firstParent[T].iterator(split, context).toArray).iterator } diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index 854993737b..8139a2a40c 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -15,14 +15,14 @@ import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils -import spark.{Dependency, RDD, SerializableWritable, SparkContext, Split, TaskContext} +import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext} /** * A Spark split class that wraps around a Hadoop InputSplit. */ -private[spark] class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit) - extends Split { +private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSplit) + extends Partition { val inputSplit = new SerializableWritable[InputSplit](s) @@ -47,12 +47,12 @@ class HadoopRDD[K, V]( // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) - override def getSplits: Array[Split] = { + override def getPartitions: Array[Partition] = { val inputFormat = createInputFormat(conf) val inputSplits = inputFormat.getSplits(conf, minSplits) - val array = new Array[Split](inputSplits.size) + val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { - array(i) = new HadoopSplit(id, i, inputSplits(i)) + array(i) = new HadoopPartition(id, i, inputSplits(i)) } array } @@ -62,8 +62,8 @@ class HadoopRDD[K, V]( .asInstanceOf[InputFormat[K, V]] } - override def compute(theSplit: Split, context: TaskContext) = new Iterator[(K, V)] { - val split = theSplit.asInstanceOf[HadoopSplit] + override def compute(theSplit: Partition, context: TaskContext) = new Iterator[(K, V)] { + val split = theSplit.asInstanceOf[HadoopPartition] var reader: RecordReader[K, V] = null val conf = confBroadcast.value.value @@ -106,9 +106,9 @@ class HadoopRDD[K, V]( } } - override def getPreferredLocations(split: Split): Seq[String] = { + override def getPreferredLocations(split: Partition): Seq[String] = { // TODO: Filtering out "localhost" in case of file:// URLs - val hadoopSplit = split.asInstanceOf[HadoopSplit] + val hadoopSplit = split.asInstanceOf[HadoopPartition] hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost") } diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala index 7b0b4525c7..d283c5b2bb 100644 --- a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala @@ -1,6 +1,6 @@ package spark.rdd -import spark.{RDD, Split, TaskContext} +import spark.{RDD, Partition, TaskContext} private[spark] @@ -13,8 +13,8 @@ class MapPartitionsRDD[U: ClassManifest, T: ClassManifest]( override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None - override def getSplits: Array[Split] = firstParent[T].splits + override def getPartitions: Array[Partition] = firstParent[T].partitions - override def compute(split: Split, context: TaskContext) = + override def compute(split: Partition, context: TaskContext) = f(firstParent[T].iterator(split, context)) -} \ No newline at end of file +} diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithIndexRDD.scala similarity index 57% rename from core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala rename to core/src/main/scala/spark/rdd/MapPartitionsWithIndexRDD.scala index c6dc1080a9..afb7504ba1 100644 --- a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala +++ b/core/src/main/scala/spark/rdd/MapPartitionsWithIndexRDD.scala @@ -1,24 +1,24 @@ package spark.rdd -import spark.{RDD, Split, TaskContext} +import spark.{RDD, Partition, TaskContext} /** - * A variant of the MapPartitionsRDD that passes the split index into the + * A variant of the MapPartitionsRDD that passes the partition index into the * closure. This can be used to generate or collect partition specific * information such as the number of tuples in a partition. */ private[spark] -class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest]( +class MapPartitionsWithIndexRDD[U: ClassManifest, T: ClassManifest]( prev: RDD[T], f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean ) extends RDD[U](prev) { - override def getSplits: Array[Split] = firstParent[T].splits + override def getPartitions: Array[Partition] = firstParent[T].partitions override val partitioner = if (preservesPartitioning) prev.partitioner else None - override def compute(split: Split, context: TaskContext) = + override def compute(split: Partition, context: TaskContext) = f(split.index, firstParent[T].iterator(split, context)) -} \ No newline at end of file +} diff --git a/core/src/main/scala/spark/rdd/MappedRDD.scala b/core/src/main/scala/spark/rdd/MappedRDD.scala index 6074f411e3..af07311b6d 100644 --- a/core/src/main/scala/spark/rdd/MappedRDD.scala +++ b/core/src/main/scala/spark/rdd/MappedRDD.scala @@ -1,13 +1,13 @@ package spark.rdd -import spark.{RDD, Split, TaskContext} +import spark.{RDD, Partition, TaskContext} private[spark] class MappedRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: T => U) extends RDD[U](prev) { - override def getSplits: Array[Split] = firstParent[T].splits + override def getPartitions: Array[Partition] = firstParent[T].partitions - override def compute(split: Split, context: TaskContext) = + override def compute(split: Partition, context: TaskContext) = firstParent[T].iterator(split, context).map(f) } diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index 345ae79d74..ebd4c3f0e2 100644 --- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -7,12 +7,12 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ -import spark.{Dependency, RDD, SerializableWritable, SparkContext, Split, TaskContext} +import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext} private[spark] -class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable) - extends Split { +class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable) + extends Partition { val serializableHadoopSplit = new SerializableWritable(rawSplit) @@ -39,19 +39,19 @@ class NewHadoopRDD[K, V]( @transient private val jobId = new JobID(jobtrackerId, id) - override def getSplits: Array[Split] = { + override def getPartitions: Array[Partition] = { val inputFormat = inputFormatClass.newInstance val jobContext = newJobContext(conf, jobId) val rawSplits = inputFormat.getSplits(jobContext).toArray - val result = new Array[Split](rawSplits.size) + val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) { - result(i) = new NewHadoopSplit(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) + result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) } result } - override def compute(theSplit: Split, context: TaskContext) = new Iterator[(K, V)] { - val split = theSplit.asInstanceOf[NewHadoopSplit] + override def compute(theSplit: Partition, context: TaskContext) = new Iterator[(K, V)] { + val split = theSplit.asInstanceOf[NewHadoopPartition] val conf = confBroadcast.value.value val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0) val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) @@ -83,8 +83,8 @@ class NewHadoopRDD[K, V]( } } - override def getPreferredLocations(split: Split): Seq[String] = { - val theSplit = split.asInstanceOf[NewHadoopSplit] + override def getPreferredLocations(split: Partition): Seq[String] = { + val theSplit = split.asInstanceOf[NewHadoopPartition] theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost") } } diff --git a/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala index e703794787..07585a88ce 100644 --- a/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala @@ -3,20 +3,20 @@ package spark.rdd import scala.collection.immutable.NumericRange import scala.collection.mutable.ArrayBuffer import scala.collection.Map -import spark.{RDD, TaskContext, SparkContext, Split} +import spark.{RDD, TaskContext, SparkContext, Partition} -private[spark] class ParallelCollectionSplit[T: ClassManifest]( +private[spark] class ParallelCollectionPartition[T: ClassManifest]( val rddId: Long, val slice: Int, values: Seq[T]) - extends Split with Serializable { + extends Partition with Serializable { def iterator: Iterator[T] = values.iterator override def hashCode(): Int = (41 * (41 + rddId) + slice).toInt override def equals(other: Any): Boolean = other match { - case that: ParallelCollectionSplit[_] => (this.rddId == that.rddId && this.slice == that.slice) + case that: ParallelCollectionPartition[_] => (this.rddId == that.rddId && this.slice == that.slice) case _ => false } @@ -34,15 +34,15 @@ private[spark] class ParallelCollectionRDD[T: ClassManifest]( // instead. // UPDATE: A parallel collection can be checkpointed to HDFS, which achieves this goal. - override def getSplits: Array[Split] = { + override def getPartitions: Array[Partition] = { val slices = ParallelCollectionRDD.slice(data, numSlices).toArray - slices.indices.map(i => new ParallelCollectionSplit(id, i, slices(i))).toArray + slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray } - override def compute(s: Split, context: TaskContext) = - s.asInstanceOf[ParallelCollectionSplit[T]].iterator + override def compute(s: Partition, context: TaskContext) = + s.asInstanceOf[ParallelCollectionPartition[T]].iterator - override def getPreferredLocations(s: Split): Seq[String] = { + override def getPreferredLocations(s: Partition): Seq[String] = { locationPrefs.getOrElse(s.index, Nil) } } diff --git a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala index d1553181c1..f2f4fd56d1 100644 --- a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala +++ b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala @@ -1,9 +1,9 @@ package spark.rdd -import spark.{NarrowDependency, RDD, SparkEnv, Split, TaskContext} +import spark.{NarrowDependency, RDD, SparkEnv, Partition, TaskContext} -class PartitionPruningRDDSplit(idx: Int, val parentSplit: Split) extends Split { +class PartitionPruningRDDPartition(idx: Int, val parentSplit: Partition) extends Partition { override val index = idx } @@ -16,15 +16,15 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo extends NarrowDependency[T](rdd) { @transient - val partitions: Array[Split] = rdd.splits.filter(s => partitionFilterFunc(s.index)) - .zipWithIndex.map { case(split, idx) => new PartitionPruningRDDSplit(idx, split) : Split } + val partitions: Array[Partition] = rdd.partitions.filter(s => partitionFilterFunc(s.index)) + .zipWithIndex.map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition } override def getParents(partitionId: Int) = List(partitions(partitionId).index) } /** - * A RDD used to prune RDD partitions/splits so we can avoid launching tasks on + * A RDD used to prune RDD partitions/partitions so we can avoid launching tasks on * all partitions. An example use case: If we know the RDD is partitioned by range, * and the execution DAG has a filter on the key, we can avoid launching tasks * on partitions that don't have the range covering the key. @@ -34,9 +34,9 @@ class PartitionPruningRDD[T: ClassManifest]( @transient partitionFilterFunc: Int => Boolean) extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) { - override def compute(split: Split, context: TaskContext) = firstParent[T].iterator( - split.asInstanceOf[PartitionPruningRDDSplit].parentSplit, context) + override def compute(split: Partition, context: TaskContext) = firstParent[T].iterator( + split.asInstanceOf[PartitionPruningRDDPartition].parentSplit, context) - override protected def getSplits: Array[Split] = + override protected def getPartitions: Array[Partition] = getDependencies.head.asInstanceOf[PruneDependency[T]].partitions } diff --git a/core/src/main/scala/spark/rdd/PipedRDD.scala b/core/src/main/scala/spark/rdd/PipedRDD.scala index 56032a8659..962a1b21ad 100644 --- a/core/src/main/scala/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/spark/rdd/PipedRDD.scala @@ -8,7 +8,7 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.io.Source -import spark.{RDD, SparkEnv, Split, TaskContext} +import spark.{RDD, SparkEnv, Partition, TaskContext} /** @@ -27,9 +27,9 @@ class PipedRDD[T: ClassManifest]( // using a standard StringTokenizer (i.e. by spaces) def this(prev: RDD[T], command: String) = this(prev, PipedRDD.tokenize(command)) - override def getSplits: Array[Split] = firstParent[T].splits + override def getPartitions: Array[Partition] = firstParent[T].partitions - override def compute(split: Split, context: TaskContext): Iterator[String] = { + override def compute(split: Partition, context: TaskContext): Iterator[String] = { val pb = new ProcessBuilder(command) // Add the environmental variables to the process. val currentEnvVars = pb.environment() diff --git a/core/src/main/scala/spark/rdd/SampledRDD.scala b/core/src/main/scala/spark/rdd/SampledRDD.scala index f2a144e2e0..243673f151 100644 --- a/core/src/main/scala/spark/rdd/SampledRDD.scala +++ b/core/src/main/scala/spark/rdd/SampledRDD.scala @@ -5,10 +5,10 @@ import java.util.Random import cern.jet.random.Poisson import cern.jet.random.engine.DRand -import spark.{RDD, Split, TaskContext} +import spark.{RDD, Partition, TaskContext} private[spark] -class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable { +class SampledRDDPartition(val prev: Partition, val seed: Int) extends Partition with Serializable { override val index: Int = prev.index } @@ -19,16 +19,16 @@ class SampledRDD[T: ClassManifest]( seed: Int) extends RDD[T](prev) { - override def getSplits: Array[Split] = { + override def getPartitions: Array[Partition] = { val rg = new Random(seed) - firstParent[T].splits.map(x => new SampledRDDSplit(x, rg.nextInt)) + firstParent[T].partitions.map(x => new SampledRDDPartition(x, rg.nextInt)) } - override def getPreferredLocations(split: Split): Seq[String] = - firstParent[T].preferredLocations(split.asInstanceOf[SampledRDDSplit].prev) + override def getPreferredLocations(split: Partition): Seq[String] = + firstParent[T].preferredLocations(split.asInstanceOf[SampledRDDPartition].prev) - override def compute(splitIn: Split, context: TaskContext): Iterator[T] = { - val split = splitIn.asInstanceOf[SampledRDDSplit] + override def compute(splitIn: Partition, context: TaskContext): Iterator[T] = { + val split = splitIn.asInstanceOf[SampledRDDPartition] if (withReplacement) { // For large datasets, the expected number of occurrences of each element in a sample with // replacement is Poisson(frac). We use that to get a count for each element. diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index bf69b5150b..c2f118305f 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -1,9 +1,9 @@ package spark.rdd -import spark.{Partitioner, RDD, SparkEnv, ShuffleDependency, Split, TaskContext} +import spark.{Partitioner, RDD, SparkEnv, ShuffleDependency, Partition, TaskContext} import spark.SparkContext._ -private[spark] class ShuffledRDDSplit(val idx: Int) extends Split { +private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition { override val index = idx override def hashCode(): Int = idx } @@ -22,11 +22,11 @@ class ShuffledRDD[K, V]( override val partitioner = Some(part) - override def getSplits: Array[Split] = { - Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i)) + override def getPartitions: Array[Partition] = { + Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i)) } - override def compute(split: Split, context: TaskContext): Iterator[(K, V)] = { + override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = { val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId SparkEnv.get.shuffleFetcher.fetch[K, V](shuffledId, split.index) } diff --git a/core/src/main/scala/spark/rdd/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala index ebc0068228..2c52a67e22 100644 --- a/core/src/main/scala/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/spark/rdd/UnionRDD.scala @@ -1,13 +1,13 @@ package spark.rdd import scala.collection.mutable.ArrayBuffer -import spark.{Dependency, RangeDependency, RDD, SparkContext, Split, TaskContext} +import spark.{Dependency, RangeDependency, RDD, SparkContext, Partition, TaskContext} import java.io.{ObjectOutputStream, IOException} -private[spark] class UnionSplit[T: ClassManifest](idx: Int, rdd: RDD[T], splitIndex: Int) - extends Split { +private[spark] class UnionPartition[T: ClassManifest](idx: Int, rdd: RDD[T], splitIndex: Int) + extends Partition { - var split: Split = rdd.splits(splitIndex) + var split: Partition = rdd.partitions(splitIndex) def iterator(context: TaskContext) = rdd.iterator(split, context) @@ -18,7 +18,7 @@ private[spark] class UnionSplit[T: ClassManifest](idx: Int, rdd: RDD[T], splitIn @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream) { // Update the reference to parent split at the time of task serialization - split = rdd.splits(splitIndex) + split = rdd.partitions(splitIndex) oos.defaultWriteObject() } } @@ -28,11 +28,11 @@ class UnionRDD[T: ClassManifest]( @transient var rdds: Seq[RDD[T]]) extends RDD[T](sc, Nil) { // Nil since we implement getDependencies - override def getSplits: Array[Split] = { - val array = new Array[Split](rdds.map(_.splits.size).sum) + override def getPartitions: Array[Partition] = { + val array = new Array[Partition](rdds.map(_.partitions.size).sum) var pos = 0 - for (rdd <- rdds; split <- rdd.splits) { - array(pos) = new UnionSplit(pos, rdd, split.index) + for (rdd <- rdds; split <- rdd.partitions) { + array(pos) = new UnionPartition(pos, rdd, split.index) pos += 1 } array @@ -42,15 +42,15 @@ class UnionRDD[T: ClassManifest]( val deps = new ArrayBuffer[Dependency[_]] var pos = 0 for (rdd <- rdds) { - deps += new RangeDependency(rdd, 0, pos, rdd.splits.size) - pos += rdd.splits.size + deps += new RangeDependency(rdd, 0, pos, rdd.partitions.size) + pos += rdd.partitions.size } deps } - override def compute(s: Split, context: TaskContext): Iterator[T] = - s.asInstanceOf[UnionSplit[T]].iterator(context) + override def compute(s: Partition, context: TaskContext): Iterator[T] = + s.asInstanceOf[UnionPartition[T]].iterator(context) - override def getPreferredLocations(s: Split): Seq[String] = - s.asInstanceOf[UnionSplit[T]].preferredLocations() + override def getPreferredLocations(s: Partition): Seq[String] = + s.asInstanceOf[UnionPartition[T]].preferredLocations() } diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala index 1ce70268bb..e80ec17aa5 100644 --- a/core/src/main/scala/spark/rdd/ZippedRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala @@ -1,17 +1,17 @@ package spark.rdd -import spark.{OneToOneDependency, RDD, SparkContext, Split, TaskContext} +import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext} import java.io.{ObjectOutputStream, IOException} -private[spark] class ZippedSplit[T: ClassManifest, U: ClassManifest]( +private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest]( idx: Int, @transient rdd1: RDD[T], @transient rdd2: RDD[U] - ) extends Split { + ) extends Partition { - var split1 = rdd1.splits(idx) - var split2 = rdd1.splits(idx) + var split1 = rdd1.partitions(idx) + var split2 = rdd1.partitions(idx) override val index: Int = idx def splits = (split1, split2) @@ -19,8 +19,8 @@ private[spark] class ZippedSplit[T: ClassManifest, U: ClassManifest]( @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream) { // Update the reference to parent split at the time of task serialization - split1 = rdd1.splits(idx) - split2 = rdd2.splits(idx) + split1 = rdd1.partitions(idx) + split2 = rdd2.partitions(idx) oos.defaultWriteObject() } } @@ -31,24 +31,24 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( var rdd2: RDD[U]) extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2))) { - override def getSplits: Array[Split] = { - if (rdd1.splits.size != rdd2.splits.size) { + override def getPartitions: Array[Partition] = { + if (rdd1.partitions.size != rdd2.partitions.size) { throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions") } - val array = new Array[Split](rdd1.splits.size) - for (i <- 0 until rdd1.splits.size) { - array(i) = new ZippedSplit(i, rdd1, rdd2) + val array = new Array[Partition](rdd1.partitions.size) + for (i <- 0 until rdd1.partitions.size) { + array(i) = new ZippedPartition(i, rdd1, rdd2) } array } - override def compute(s: Split, context: TaskContext): Iterator[(T, U)] = { - val (split1, split2) = s.asInstanceOf[ZippedSplit[T, U]].splits + override def compute(s: Partition, context: TaskContext): Iterator[(T, U)] = { + val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits rdd1.iterator(split1, context).zip(rdd2.iterator(split2, context)) } - override def getPreferredLocations(s: Split): Seq[String] = { - val (split1, split2) = s.asInstanceOf[ZippedSplit[T, U]].splits + override def getPreferredLocations(s: Partition): Seq[String] = { + val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits rdd1.preferredLocations(split1).intersect(rdd2.preferredLocations(split2)) } diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 319eef6978..bf0837c066 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -106,7 +106,7 @@ class DAGScheduler( private def getCacheLocs(rdd: RDD[_]): Array[List[String]] = { if (!cacheLocs.contains(rdd.id)) { - val blockIds = rdd.splits.indices.map(index=> "rdd_%d_%d".format(rdd.id, index)).toArray + val blockIds = rdd.partitions.indices.map(index=> "rdd_%d_%d".format(rdd.id, index)).toArray cacheLocs(rdd.id) = blockManagerMaster.getLocations(blockIds).map { locations => locations.map(_.ip).toList }.toArray @@ -141,9 +141,9 @@ class DAGScheduler( private def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_]], priority: Int): Stage = { if (shuffleDep != None) { // Kind of ugly: need to register RDDs with the cache and map output tracker here - // since we can't do it in the RDD constructor because # of splits is unknown + // since we can't do it in the RDD constructor because # of partitions is unknown logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")") - mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.splits.size) + mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size) } val id = nextStageId.getAndIncrement() val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority) @@ -162,7 +162,7 @@ class DAGScheduler( if (!visited(r)) { visited += r // Kind of ugly: need to register RDDs with the cache here since - // we can't do it in its constructor because # of splits is unknown + // we can't do it in its constructor because # of partitions is unknown for (dep <- r.dependencies) { dep match { case shufDep: ShuffleDependency[_,_] => @@ -257,7 +257,7 @@ class DAGScheduler( { val listener = new ApproximateActionListener(rdd, func, evaluator, timeout) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] - val partitions = (0 until rdd.splits.size).toArray + val partitions = (0 until rdd.partitions.size).toArray eventQueue.put(JobSubmitted(rdd, func2, partitions, false, callSite, listener)) return listener.awaitResult() // Will throw an exception if the job fails } @@ -386,7 +386,7 @@ class DAGScheduler( try { SparkEnv.set(env) val rdd = job.finalStage.rdd - val split = rdd.splits(job.partitions(0)) + val split = rdd.partitions(job.partitions(0)) val taskContext = new TaskContext(job.finalStage.id, job.partitions(0), 0) try { val result = job.func(taskContext, rdd.iterator(split, taskContext)) @@ -672,7 +672,7 @@ class DAGScheduler( return cached } // If the RDD has some placement preferences (as is the case for input RDDs), get those - val rddPrefs = rdd.preferredLocations(rdd.splits(partition)).toList + val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList if (rddPrefs != Nil) { return rddPrefs } diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala index 8cd4c661eb..1721f78f48 100644 --- a/core/src/main/scala/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/spark/scheduler/ResultTask.scala @@ -67,7 +67,7 @@ private[spark] class ResultTask[T, U]( var split = if (rdd == null) { null } else { - rdd.splits(partition) + rdd.partitions(partition) } override def run(attemptId: Long): U = { @@ -85,7 +85,7 @@ private[spark] class ResultTask[T, U]( override def writeExternal(out: ObjectOutput) { RDDCheckpointData.synchronized { - split = rdd.splits(partition) + split = rdd.partitions(partition) out.writeInt(stageId) val bytes = ResultTask.serializeInfo( stageId, rdd, func.asInstanceOf[(TaskContext, Iterator[_]) => _]) @@ -107,6 +107,6 @@ private[spark] class ResultTask[T, U]( func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U] partition = in.readInt() val outputId = in.readInt() - split = in.readObject().asInstanceOf[Split] + split = in.readObject().asInstanceOf[Partition] } } diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index bed9f1864f..59ee3c0a09 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -86,12 +86,12 @@ private[spark] class ShuffleMapTask( var split = if (rdd == null) { null } else { - rdd.splits(partition) + rdd.partitions(partition) } override def writeExternal(out: ObjectOutput) { RDDCheckpointData.synchronized { - split = rdd.splits(partition) + split = rdd.partitions(partition) out.writeInt(stageId) val bytes = ShuffleMapTask.serializeInfo(stageId, rdd, dep) out.writeInt(bytes.length) @@ -112,7 +112,7 @@ private[spark] class ShuffleMapTask( dep = dep_ partition = in.readInt() generation = in.readLong() - split = in.readObject().asInstanceOf[Split] + split = in.readObject().asInstanceOf[Partition] } override def run(attemptId: Long): MapStatus = { diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala index 374114d870..552061e46b 100644 --- a/core/src/main/scala/spark/scheduler/Stage.scala +++ b/core/src/main/scala/spark/scheduler/Stage.scala @@ -28,7 +28,7 @@ private[spark] class Stage( extends Logging { val isShuffleMap = shuffleDep != None - val numPartitions = rdd.splits.size + val numPartitions = rdd.partitions.size val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) var numAvailableOutputs = 0 diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 2e7db60841..2462721fb8 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -513,7 +513,7 @@ class BlockManager( } } - // Split local and remote blocks. Remote blocks are further split into FetchRequests of size + // Partition local and remote blocks. Remote blocks are further split into FetchRequests of size // at most maxBytesInFlight in order to limit the amount of data in flight. val remoteRequests = new ArrayBuffer[FetchRequest] for ((address, blockInfos) <- blocksByAddress) { diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index 5f72b67b2c..dec47a9d41 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -63,7 +63,7 @@ object StorageUtils { val rddName = Option(rdd.name).getOrElse(rddKey) val rddStorageLevel = rdd.getStorageLevel - RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, rdd.splits.size, memSize, diskSize) + RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, rdd.partitions.size, memSize, diskSize) }.toArray } diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala index 51ff966ae4..3e5ffa81d6 100644 --- a/core/src/test/scala/spark/CheckpointSuite.scala +++ b/core/src/test/scala/spark/CheckpointSuite.scala @@ -34,7 +34,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { testCheckpointing(_.sample(false, 0.5, 0)) testCheckpointing(_.glom()) testCheckpointing(_.mapPartitions(_.map(_.toString))) - testCheckpointing(r => new MapPartitionsWithSplitRDD(r, + testCheckpointing(r => new MapPartitionsWithIndexRDD(r, (i: Int, iter: Iterator[Int]) => iter.map(_.toString), false )) testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString)) testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x)) @@ -43,14 +43,14 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { test("ParallelCollection") { val parCollection = sc.makeRDD(1 to 4, 2) - val numSplits = parCollection.splits.size + val numPartitions = parCollection.partitions.size parCollection.checkpoint() assert(parCollection.dependencies === Nil) val result = parCollection.collect() assert(sc.checkpointFile[Int](parCollection.getCheckpointFile.get).collect() === result) assert(parCollection.dependencies != Nil) - assert(parCollection.splits.length === numSplits) - assert(parCollection.splits.toList === parCollection.checkpointData.get.getSplits.toList) + assert(parCollection.partitions.length === numPartitions) + assert(parCollection.partitions.toList === parCollection.checkpointData.get.getPartitions.toList) assert(parCollection.collect() === result) } @@ -59,13 +59,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { val blockManager = SparkEnv.get.blockManager blockManager.putSingle(blockId, "test", StorageLevel.MEMORY_ONLY) val blockRDD = new BlockRDD[String](sc, Array(blockId)) - val numSplits = blockRDD.splits.size + val numPartitions = blockRDD.partitions.size blockRDD.checkpoint() val result = blockRDD.collect() assert(sc.checkpointFile[String](blockRDD.getCheckpointFile.get).collect() === result) assert(blockRDD.dependencies != Nil) - assert(blockRDD.splits.length === numSplits) - assert(blockRDD.splits.toList === blockRDD.checkpointData.get.getSplits.toList) + assert(blockRDD.partitions.length === numPartitions) + assert(blockRDD.partitions.toList === blockRDD.checkpointData.get.getPartitions.toList) assert(blockRDD.collect() === result) } @@ -79,9 +79,9 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { test("UnionRDD") { def otherRDD = sc.makeRDD(1 to 10, 1) - // Test whether the size of UnionRDDSplits reduce in size after parent RDD is checkpointed. + // Test whether the size of UnionRDDPartitions reduce in size after parent RDD is checkpointed. // Current implementation of UnionRDD has transient reference to parent RDDs, - // so only the splits will reduce in serialized size, not the RDD. + // so only the partitions will reduce in serialized size, not the RDD. testCheckpointing(_.union(otherRDD), false, true) testParentCheckpointing(_.union(otherRDD), false, true) } @@ -91,21 +91,21 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { testCheckpointing(new CartesianRDD(sc, _, otherRDD)) // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed - // Current implementation of CoalescedRDDSplit has transient reference to parent RDD, - // so only the RDD will reduce in serialized size, not the splits. + // Current implementation of CoalescedRDDPartition has transient reference to parent RDD, + // so only the RDD will reduce in serialized size, not the partitions. testParentCheckpointing(new CartesianRDD(sc, _, otherRDD), true, false) - // Test that the CartesianRDD updates parent splits (CartesianRDD.s1/s2) after - // the parent RDD has been checkpointed and parent splits have been changed to HadoopSplits. + // Test that the CartesianRDD updates parent partitions (CartesianRDD.s1/s2) after + // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions. // Note that this test is very specific to the current implementation of CartesianRDD. val ones = sc.makeRDD(1 to 100, 10).map(x => x) ones.checkpoint() // checkpoint that MappedRDD val cartesian = new CartesianRDD(sc, ones, ones) val splitBeforeCheckpoint = - serializeDeserialize(cartesian.splits.head.asInstanceOf[CartesianSplit]) + serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition]) cartesian.count() // do the checkpointing val splitAfterCheckpoint = - serializeDeserialize(cartesian.splits.head.asInstanceOf[CartesianSplit]) + serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition]) assert( (splitAfterCheckpoint.s1 != splitBeforeCheckpoint.s1) && (splitAfterCheckpoint.s2 != splitBeforeCheckpoint.s2), @@ -117,24 +117,24 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { testCheckpointing(_.coalesce(2)) // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed - // Current implementation of CoalescedRDDSplit has transient reference to parent RDD, - // so only the RDD will reduce in serialized size, not the splits. + // Current implementation of CoalescedRDDPartition has transient reference to parent RDD, + // so only the RDD will reduce in serialized size, not the partitions. testParentCheckpointing(_.coalesce(2), true, false) - // Test that the CoalescedRDDSplit updates parent splits (CoalescedRDDSplit.parents) after - // the parent RDD has been checkpointed and parent splits have been changed to HadoopSplits. - // Note that this test is very specific to the current implementation of CoalescedRDDSplits + // Test that the CoalescedRDDPartition updates parent partitions (CoalescedRDDPartition.parents) after + // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions. + // Note that this test is very specific to the current implementation of CoalescedRDDPartitions val ones = sc.makeRDD(1 to 100, 10).map(x => x) ones.checkpoint() // checkpoint that MappedRDD val coalesced = new CoalescedRDD(ones, 2) val splitBeforeCheckpoint = - serializeDeserialize(coalesced.splits.head.asInstanceOf[CoalescedRDDSplit]) + serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition]) coalesced.count() // do the checkpointing val splitAfterCheckpoint = - serializeDeserialize(coalesced.splits.head.asInstanceOf[CoalescedRDDSplit]) + serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition]) assert( splitAfterCheckpoint.parents.head != splitBeforeCheckpoint.parents.head, - "CoalescedRDDSplit.parents not updated after parent RDD checkpointed" + "CoalescedRDDPartition.parents not updated after parent RDD checkpointed" ) } @@ -156,8 +156,8 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false) // Test whether size of ZippedRDD reduce in size after parent RDD is checkpointed - // Current implementation of ZippedRDDSplit has transient references to parent RDDs, - // so only the RDD will reduce in serialized size, not the splits. + // Current implementation of ZippedRDDPartitions has transient references to parent RDDs, + // so only the RDD will reduce in serialized size, not the partitions. testParentCheckpointing( rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false) } @@ -165,21 +165,21 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { /** * Test checkpointing of the final RDD generated by the given operation. By default, * this method tests whether the size of serialized RDD has reduced after checkpointing or not. - * It can also test whether the size of serialized RDD splits has reduced after checkpointing or - * not, but this is not done by default as usually the splits do not refer to any RDD and + * It can also test whether the size of serialized RDD partitions has reduced after checkpointing or + * not, but this is not done by default as usually the partitions do not refer to any RDD and * therefore never store the lineage. */ def testCheckpointing[U: ClassManifest]( op: (RDD[Int]) => RDD[U], testRDDSize: Boolean = true, - testRDDSplitSize: Boolean = false + testRDDPartitionSize: Boolean = false ) { // Generate the final RDD using given RDD operation val baseRDD = generateLongLineageRDD() val operatedRDD = op(baseRDD) val parentRDD = operatedRDD.dependencies.headOption.orNull val rddType = operatedRDD.getClass.getSimpleName - val numSplits = operatedRDD.splits.length + val numPartitions = operatedRDD.partitions.length // Find serialized sizes before and after the checkpoint val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD) @@ -193,11 +193,11 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { // Test whether dependencies have been changed from its earlier parent RDD assert(operatedRDD.dependencies.head.rdd != parentRDD) - // Test whether the splits have been changed to the new Hadoop splits - assert(operatedRDD.splits.toList === operatedRDD.checkpointData.get.getSplits.toList) + // Test whether the partitions have been changed to the new Hadoop partitions + assert(operatedRDD.partitions.toList === operatedRDD.checkpointData.get.getPartitions.toList) - // Test whether the number of splits is same as before - assert(operatedRDD.splits.length === numSplits) + // Test whether the number of partitions is same as before + assert(operatedRDD.partitions.length === numPartitions) // Test whether the data in the checkpointed RDD is same as original assert(operatedRDD.collect() === result) @@ -215,18 +215,18 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { ) } - // Test whether serialized size of the splits has reduced. If the splits - // do not have any non-transient reference to another RDD or another RDD's splits, it + // Test whether serialized size of the partitions has reduced. If the partitions + // do not have any non-transient reference to another RDD or another RDD's partitions, it // does not refer to a lineage and therefore may not reduce in size after checkpointing. - // However, if the original splits before checkpointing do refer to a parent RDD, the splits + // However, if the original partitions before checkpointing do refer to a parent RDD, the partitions // must be forgotten after checkpointing (to remove all reference to parent RDDs) and - // replaced with the HadoopSplits of the checkpointed RDD. - if (testRDDSplitSize) { - logInfo("Size of " + rddType + " splits " + // replaced with the HadooPartitions of the checkpointed RDD. + if (testRDDPartitionSize) { + logInfo("Size of " + rddType + " partitions " + "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]") assert( splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint, - "Size of " + rddType + " splits did not reduce after checkpointing " + + "Size of " + rddType + " partitions did not reduce after checkpointing " + "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]" ) } @@ -235,13 +235,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { /** * Test whether checkpointing of the parent of the generated RDD also * truncates the lineage or not. Some RDDs like CoGroupedRDD hold on to its parent - * RDDs splits. So even if the parent RDD is checkpointed and its splits changed, - * this RDD will remember the splits and therefore potentially the whole lineage. + * RDDs partitions. So even if the parent RDD is checkpointed and its partitions changed, + * this RDD will remember the partitions and therefore potentially the whole lineage. */ def testParentCheckpointing[U: ClassManifest]( op: (RDD[Int]) => RDD[U], testRDDSize: Boolean, - testRDDSplitSize: Boolean + testRDDPartitionSize: Boolean ) { // Generate the final RDD using given RDD operation val baseRDD = generateLongLineageRDD() @@ -250,9 +250,9 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { val rddType = operatedRDD.getClass.getSimpleName val parentRDDType = parentRDD.getClass.getSimpleName - // Get the splits and dependencies of the parent in case they're lazily computed + // Get the partitions and dependencies of the parent in case they're lazily computed parentRDD.dependencies - parentRDD.splits + parentRDD.partitions // Find serialized sizes before and after the checkpoint val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD) @@ -275,16 +275,16 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { ) } - // Test whether serialized size of the splits has reduced because of its parent being - // checkpointed. If the splits do not have any non-transient reference to another RDD - // or another RDD's splits, it does not refer to a lineage and therefore may not reduce - // in size after checkpointing. However, if the splits do refer to the *splits* of a parent - // RDD, then these splits must update reference to the parent RDD splits as the parent RDD's - // splits must have changed after checkpointing. - if (testRDDSplitSize) { + // Test whether serialized size of the partitions has reduced because of its parent being + // checkpointed. If the partitions do not have any non-transient reference to another RDD + // or another RDD's partitions, it does not refer to a lineage and therefore may not reduce + // in size after checkpointing. However, if the partitions do refer to the *partitions* of a parent + // RDD, then these partitions must update reference to the parent RDD partitions as the parent RDD's + // partitions must have changed after checkpointing. + if (testRDDPartitionSize) { assert( splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint, - "Size of " + rddType + " splits did not reduce after checkpointing parent " + parentRDDType + + "Size of " + rddType + " partitions did not reduce after checkpointing parent " + parentRDDType + "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]" ) } @@ -321,12 +321,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { } /** - * Get serialized sizes of the RDD and its splits, in order to test whether the size shrinks + * Get serialized sizes of the RDD and its partitions, in order to test whether the size shrinks * upon checkpointing. Ignores the checkpointData field, which may grow when we checkpoint. */ def getSerializedSizes(rdd: RDD[_]): (Int, Int) = { (Utils.serialize(rdd).length - Utils.serialize(rdd.checkpointData).length, - Utils.serialize(rdd.splits).length) + Utils.serialize(rdd.partitions).length) } /** diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index ffa866de75..9739ba869b 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -33,6 +33,11 @@ class RDDSuite extends FunSuite with LocalSparkContext { } assert(partitionSumsWithSplit.collect().toList === List((0, 3), (1, 7))) + val partitionSumsWithIndex = nums.mapPartitionsWithIndex { + case(split, iter) => Iterator((split, iter.reduceLeft(_ + _))) + } + assert(partitionSumsWithIndex.collect().toList === List((0, 3), (1, 7))) + intercept[UnsupportedOperationException] { nums.filter(_ > 5).reduce(_ + _) } @@ -97,12 +102,12 @@ class RDDSuite extends FunSuite with LocalSparkContext { test("caching with failures") { sc = new SparkContext("local", "test") - val onlySplit = new Split { override def index: Int = 0 } + val onlySplit = new Partition { override def index: Int = 0 } var shouldFail = true val rdd = new RDD[Int](sc, Nil) { - override def getSplits: Array[Split] = Array(onlySplit) + override def getPartitions: Array[Partition] = Array(onlySplit) override val getDependencies = List[Dependency[_]]() - override def compute(split: Split, context: TaskContext): Iterator[Int] = { + override def compute(split: Partition, context: TaskContext): Iterator[Int] = { if (shouldFail) { throw new Exception("injected failure") } else { @@ -168,7 +173,7 @@ class RDDSuite extends FunSuite with LocalSparkContext { val data = sc.parallelize(1 to 10, 10) // Note that split number starts from 0, so > 8 means only 10th partition left. val prunedRdd = new PartitionPruningRDD(data, splitNum => splitNum > 8) - assert(prunedRdd.splits.size === 1) + assert(prunedRdd.partitions.size === 1) val prunedData = prunedRdd.collect() assert(prunedData.size === 1) assert(prunedData(0) === 10) diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 50f2b294bf..92c3f67416 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -222,7 +222,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { sc = new SparkContext("local", "test") val emptyDir = Files.createTempDir() val file = sc.textFile(emptyDir.getAbsolutePath) - assert(file.splits.size == 0) + assert(file.partitions.size == 0) assert(file.collect().toList === Nil) // Test that a shuffle on the file works, because this used to be a bug assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala index edb8c839fc..495f957e53 100644 --- a/core/src/test/scala/spark/SortingSuite.scala +++ b/core/src/test/scala/spark/SortingSuite.scala @@ -19,7 +19,7 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 2) val sorted = pairs.sortByKey() - assert(sorted.splits.size === 2) + assert(sorted.partitions.size === 2) assert(sorted.collect() === pairArr.sortBy(_._1)) } @@ -29,17 +29,17 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 2) val sorted = pairs.sortByKey(true, 1) - assert(sorted.splits.size === 1) + assert(sorted.partitions.size === 1) assert(sorted.collect() === pairArr.sortBy(_._1)) } - test("large array with many splits") { + test("large array with many partitions") { sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 2) val sorted = pairs.sortByKey(true, 20) - assert(sorted.splits.size === 20) + assert(sorted.partitions.size === 20) assert(sorted.collect() === pairArr.sortBy(_._1)) } @@ -59,7 +59,7 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w assert(pairs.sortByKey(false, 1).collect() === pairArr.sortWith((x, y) => x._1 > y._1)) } - test("sort descending with many splits") { + test("sort descending with many partitions") { sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala index 83663ac702..8de490eb86 100644 --- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala @@ -24,7 +24,7 @@ import spark.MapOutputTracker import spark.RDD import spark.SparkContext import spark.SparkException -import spark.Split +import spark.Partition import spark.TaskContext import spark.TaskEndReason @@ -144,18 +144,18 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar * so we can test that DAGScheduler does not try to execute RDDs locally. */ def makeRdd( - numSplits: Int, + numPartitions: Int, dependencies: List[Dependency[_]], locations: Seq[Seq[String]] = Nil ): MyRDD = { - val maxSplit = numSplits - 1 + val maxPartition = numPartitions - 1 return new MyRDD(sc, dependencies) { - override def compute(split: Split, context: TaskContext): Iterator[(Int, Int)] = + override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = throw new RuntimeException("should not be reached") - override def getSplits() = (0 to maxSplit).map(i => new Split { + override def getPartitions = (0 to maxPartition).map(i => new Partition { override def index = i }).toArray - override def getPreferredLocations(split: Split): Seq[String] = + override def getPreferredLocations(split: Partition): Seq[String] = if (locations.isDefinedAt(split.index)) locations(split.index) else @@ -295,11 +295,11 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar * collect the result of the job via callbacks from DAGScheduler. */ def submitRdd(rdd: MyRDD, allowLocal: Boolean = false): (JobWaiter[Int], Array[Int]) = { - val resultArray = new Array[Int](rdd.splits.size) + val resultArray = new Array[Int](rdd.partitions.size) val (toSubmit, waiter) = scheduler.prepareJob[(Int, Int), Int]( rdd, jobComputeFunc, - (0 to (rdd.splits.size - 1)), + (0 to (rdd.partitions.size - 1)), "test-site", allowLocal, (i: Int, value: Int) => resultArray(i) = value @@ -355,10 +355,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar test("local job") { val rdd = new MyRDD(sc, Nil) { - override def compute(split: Split, context: TaskContext): Iterator[(Int, Int)] = + override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = Array(42 -> 0).iterator - override def getSplits() = Array( new Split { override def index = 0 } ) - override def getPreferredLocations(split: Split) = Nil + override def getPartitions = Array( new Partition { override def index = 0 } ) + override def getPreferredLocations(split: Partition) = Nil override def toString = "DAGSchedulerSuite Local RDD" } submitRdd(rdd, true) diff --git a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala index a5db7103f5..647bcaf860 100644 --- a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala @@ -5,7 +5,7 @@ import org.scalatest.BeforeAndAfter import spark.TaskContext import spark.RDD import spark.SparkContext -import spark.Split +import spark.Partition import spark.LocalSparkContext class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { @@ -14,8 +14,8 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte var completed = false sc = new SparkContext("local", "test") val rdd = new RDD[String](sc, List()) { - override def getSplits = Array[Split](StubSplit(0)) - override def compute(split: Split, context: TaskContext) = { + override def getPartitions = Array[Partition](StubPartition(0)) + override def compute(split: Partition, context: TaskContext) = { context.addOnCompleteCallback(() => completed = true) sys.error("failed") } @@ -28,5 +28,5 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte assert(completed === true) } - case class StubSplit(val index: Int) extends Split -} \ No newline at end of file + case class StubPartition(val index: Int) extends Partition +}